## Concurrency & Asynchrony in Scala by Greg Dorrell Note: - I work for HomeAway, which is part of Expedia Group. Both HomeAway and Hotels.com are hiring. Scala is a language with lots of choice Note: - If we look at programming style... - You can write Scala as better Java - You can use the Akka ecosystem and do reactive programming. - You can write functional code (and even then, it can be cats or scalaz) - This can be overwhelming, especially as a beginner. Which style of Scala should you use? - In that scenario, just decide on one as a team and be willing to evolve as you learn more. - I see these variety of options as an advantage. - I like that I've been able to start with better Java, and gradually introduce reactive or functional. - I find it impressive we can use different techniques within one language Concurrency libraries: - Futures - Akka - Monix - Cats Effect - ZIO - ... Note: - The same is true with concurrency. There's a lot of different options out there. - So today we're going to take a look at a number of them. - I want to give you an idea of where you might use Akka Streams, or why you might use Monix Task instead of Scala's Future. - But I'm also going to deep dive on Futures. - And we'll look at some of the problems I've hit with Futures, which help motivate using some of the other options. My first contact with concurrency in Scala Note: - So, we'll start where I started, which was back in my first job where I used Scala. - I was using Scala for some data-migration scripts. - They would fetch a document from one place, transform it a bit, and upload them to a new system via a REST endpoint. - But whilst working on those data-migration scripts, they were annoying slow pushing data to the new system via a REST endpoint. - I realised it was an "embarrassingly parallel" problem to upload this collection of records to the new datasource - Back in 2011, 2.9 had just come out with an awesome new feature - parallel collections! ## Parallel collections ```scala def upload(doc: String): Unit = ... val docs: List[String] = ... //sequential docs.foreach(upload) //parallel docs.par.foreach(upload) ``` Note: - Already this was way better than my prior experience with concurrency - Java Threads, and all the fun that goes along with that - semaphores, locking. - As you start using immutability, those issues around modifying shared state go away. But there are still other issues we can hit, and we'll take a look at those. - Easy to use - Works with existing blocking libraries ︎✔ - It's still blocking. - Limited parallelism Note: - Normal collections API, just add `.par`. No need for new concepts like Futures or Actors. - Who is familiar with the execution context? - global EC defaults to the number of cores multipled by 2. So the concurrency of this is limited. - The work is executed using a `TaskSupport`; by default it's using the global ExecutionContext. - That means you have, by default, num_cores * 2 threads to work with. - Global EC is great for CPU bound work, but not blocking - So this won't scale up too far, and may cause contension issues in our app if the ExecutionContext is used for anything else. - It would be bad to do this per web request! ## Futures ```scala def upload(doc: String): Future[Unit] = ... val docs: Seq[String] = ... docs.foreach(upload) ``` Note: - So here, we've changed upload to return a Future. - Unfortunately, there's a problem here. - I'm uploading all the documents, but I'm not waiting for them to finish uploading. - If this was the end of my application, the app will just exit without finishing the upload ## Futures ```scala def upload(doc: String): Future[Unit] = ... val docs: Seq[String] = ... val eventualUpdates = docs.map(upload) val updateAll = Future.sequence(eventualUpdates) Await.result(updateAll, Duration.Inf) ``` Note: - We need to keep the Futures that `upload` returns and wait for them to finish. Does this solve the problems of parallel collections? Note: - We've gone to the effort of learning about Future, and how to `sequence` it, and await its' result. - So you would hope it solves our problems... It depends Naive version of `upload` ```scala import scala.concurrent.ExecutionContext.global def upload(doc: String) = Future( blockingClient.post("/someUrl", doc) ) ``` Note: - All the same problems as parallel collections - also, this context is shared across your app... So everything else in your app has to wait. - If you're using Akka and getting the ExecutionContext from there, same problem Risky solution ```scala def upload(doc: String)(implicit ec: ExecutionContext) = Future( blockingClient.post("/someUrl", doc) ) ``` Note: - This passes off the responsibility to the caller. - But the caller has no idea what you're doing with the execution context. - They'll probably use global, and it won't go too well. - I've been bitten by this. So when I see application code taking an implicit execution context, I'm suspicious and have to go look at the implementation. Avoid this. Custom ExecutionContext ```scala implicit val ioEC: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) def upload(doc: String) = Future( blockingClient.post("/someUrl", doc) ) ``` Note: - This will work. - It won't slow down other parts of your application. - You even have the ability to control how much concurrency there is. - You can get to quite a lot of threads before it's a problem. scala.concurrent.blocking ```scala import scala.concurrent.ExecutionContext.global import scala.concurrent.blocking def upload(doc: String) = Future( blocking { blockingClient.post("/someUrl", doc) } ) ``` Note: - Here we wrap the blocking bit of code inside a `scala.concurrent.blocking` wrapper. - This lets the execution context know that the code is blocking - For the global execution context doesn't count that towards the concurrency level. - So this will allow the ExecutionContext to create more threads above the limit for CPUs * 2. - So it won't block your app... However it is now unbounded. - A Thread by default takes 1MB of memory, that's just the memory overhead. - So if we had 1000 documents, this would create 1000 new threads, and that would (for a short while) use 1GB of memory. - When you start to have a few thousand documents, this can be a problem. So be careful. Asynchronous non-blocking upload ```scala def upload(doc: String) = asyncClient.post("/someUrl", doc) ``` Note: - For a long time, I didn't really understand how you could actually do this asynchronously. - And understanding that is a topic for another talk. But at a glance: - With Java IO you have streams, and when you read a byte from a stream you block the thread you're on until it's available. - With Java NIO (hand-wave, there's actually NIO and NIO2!) you have buffers (with associated channels). And you can check if there's data to read in a buffer, but not block if there isn't. - When there's something useful in the buffer, you can read it out, and put it somewhere. And if you completed a scala promise with that, you'd have async IO. - So something has to check those buffers and read/write data to them. But you have have one or two threads managing many sockets. - In my current team, we're fans of Twitter's Finagle, which uses Netty, which operates similarly to java NIO. - This is also unbounded. In this scenario we don't have to worry about running out of memory.. - However, we can overload the remote webservice we're calling, or the database behind it. - And that's a scenario I've hit at work, our cassandra cluster reads were impacted when we did lots of writes - So we decided to throttle our writes. Throttling Future.traverse [demo](https://embed.scalafiddle.io/embed?sfid=ZRt7Df5/10) Note: - But we had this come up in batch job where we were just using Futures. And I was suprised find out you can do it yourself with Futures! - But it's limited. If we do batches of 10, we have to wait for the slowest of each batch to finish before we start the next one. - So instead of taking N * avg response time, it takes N * P90. - Instead, a better option is an AsyncSemaphore. Monix A library for asynchronous programming in Scala Note: - What does that mean? It sounds super general. - Well, Monix actually has a few different useful sub-projects monix-execution provides utilities for working with `scala.concurrent` Note: One that's of interest to us is AsyncSemaphore... Monix AsyncSemaphore ([demo](https://embed.scalafiddle.io/embed?sfid=5R6oHm4/4)) Note: - This helps us throttle our Futures without waiting for a whole batch to finish. monix-eval provides Task Task is an alternative "effect type" to Future Note: - Nothing about Future suggest you should do side effects inside them. - You can use them for running CPU-bound referentially transparent code concurrently. - But in reality, most of us put effects inside Futures - Task is an alternative way to handle effects Unlike a Future, a Task is not eager. You have to explicitly run it. Note: - When you create a Future, it immediately starts running If you run a Task twice, it will actually run twice. *demo* Note: - DEMO - Cancellable. Timeout-able. monix-reactive provides Observable We've looked at a finite list of input documents. But streaming (AKA the Reactive Pattern) is also common. Note: - For me, that's mostly reading messages from Kafka Monix Observable is an implementation of the Reactive Pattern With adapters to and from the Reactive Streams standard (java.util.concurrent.Flow) The API is similar to Akka Streams Note: - So rather than demo both, let's take a look at Akka Streams ## Akka Streams [demo](https://embed.scalafiddle.io/embed?sfid=j8YzfY7/5) - Mature and widely adopted. ✔ - Some boilerplate to use it. ✘ - You have to use Future ✘ ## Cats Effect - It was started last April by Daniel Spiewak. - The largest contributor now is Alexandru Nedelcu, who is the creator of Monix. Provides an IO monad which is similar to Monix Task. Provides typeclasses for effect types. Just as List has Applicative, Monad, and Foldable... ...an "effect" type can have Sync, Async, and Concurrent Monix Task also implements those typeclasses. Just as cats NonEmptyList is a more restricted List... ...cats IO is a more restricted Task. Check out Runar's great talk "Constraints Liberate, Liberties Constrain" ## ZIO Another effect type Note: Like Task or cats IO or Future Implements the Cats Effect typeclasses You define the error type ```scala IO.point("Hello World") //scalaz.zio.IO[Nothing, String] ``` Note: - Cats Effect and Monix take the approach that the failure type has to be Throwable - Just as Future does - ZIO doesn't. ### Summary **Use non-blocking libraries** - Futures get to quite far. - But there pitfalls. My thoughts Note: - I'm still using Futures and Akka-Streams quite a lot day-to-day. - I'm not going to refactor all my apps. - But when I hit issues, I'll be reaching for a different effect type. - Those effects types have been rapidly evolving, but I most like working with Monix. - It kinda sucks that Akka-Streams makes you use Future as your effect type. - So I'll be looking to use Monix Observable in future # Questions?