June 30, 2015
Engineering

Throttling Instantiations of Scala Futures

Scala allows us to easily execute a block of code asynchronously using futures from scala.concurrent. At Quantifind, we utilize Scala futures to concurrently execute hundreds of thousands of numerical calculations in parallel. However, one issue that we recently ran into is the JVM running out of memory and crashing due to a lack of available heap space. We found that the root cause was over-instantiation of futures.

A scala.concurrent.ExecutionContext is needed any time one spawns futures in Scala. Most of the time it’s sufficient to use the default “global” ExecutionContext, which one can bring into scope by importing scala.concurrent.ExecutionContext.Implicits.global.

The global ExecutionContext, as well as “custom fixed” versions that we mostly create in our code base, are essentially backed by an unbounded LinkedBlockingQueue. (This is effectively but not exactly how it’s done for the global version, since it attempts to create a ForkJoinPool for work stealing, but what’s important is that it’s unbounded.)

Here’s how we generally spawn futures in Scala:

import java.util.concurrent.Executors
import scala.concurrent._
// number of threads on the box
val numWorkers = sys.runtime.availableProcessors
// underlying thread pool with a fixed number of worker threads,
// backed by an unbounded LinkedBlockingQueue[Runnable]
val pool = Executors.newFixedThreadPool(numWorkers)
// the ExecutionContext that wraps the thread pool
implicit val ec = ExecutionContext.fromExecutorService(pool)
// the spawned future below will be instantiated immediately and enqueued
// on the unbounded LinkedBlockingQueue[Runnable] attached to the thread pool
future {
// do some work, e.g. call a computationally expensive method
// or open a socket and send some data
doWork()
}

Notice in the code snippet above that the future, when instantiated, is immediately enqueued onto the work queue and is then later executed by the thread pool once a thread becomes available to execute it. If a worker thread is not available to handle it, the future will just sit in the queue until a thread frees up. If you then proceed to create more futures, they will also just get queued up. This is a problem in systems that spawn a lot of futures since there is no throttling or back-pressure happening to slow down the rate of future allocations! This eventually can lead to the JVM failing with out of memory errors since the garbage collector can’t free up any more memory for new future allocations.

So, what we really want is a way to automatically throttle the creation of futures in order to give the worker threads a chance to drain the existing queue of pending futures. We want to automatically introduce back-pressure and block the caller from creating and enqueueing more futures until we have enough capacity to do so. Essentially what we want is a bounded queue with a fixed capacity, so that the next future instantiation blocks until space in the queue becomes available.

A while back there was a post on the scala-user mailing list with a user having a similar problem. User

Hua Jiang posted a nice concise solution that shows how to create a custom ExecutionContext backed by a bounded queue that forces the caller to block until space becomes available in the queue. 

This is how you create that:

val numWorkers = sys.runtime.availableProcessors
val queueCapacity = 100
implicit val ec = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(
  numWorkers, numWorkers,
  0L, TimeUnit.SECONDS,
  new ArrayBlockingQueue[Runnable](queueCapacity) {
    override def offer(e: Runnable) = {
      put(e); // may block if waiting for empty room
      true
    }
  }
)
)

Notice that in the code above we are creating our own blocking queue with the offer method overridden. We override offer because offer normally returns immediately with true or false, indicating whether or not we were able to immediately enqueue the current item. Since the Scala ExecutionContext relies on calling the offer method, we have to override it to use the put method which will automatically block the caller until space becomes available in the queue. The net result is that future instantiations can finally be controlled with proper back-pressure.

One must be careful using this approach to throttle future instantiations, however. Deadlocks can occur if you are spawning new futures within other futures. We originally ran into this problem when switching to this approach. We solved it by having two different ExecutionContext instances. This is something to keep in mind when switching to this method.