Thread Pools #
The program discussed in the previous section might start more threads than processor cores are available. It is often more efficient to create threads corresponding to available cores in advance and reuse them. So called thread pools are used to implement this approach.
Before implementing a renderer using a thread pool we write tests to learn about their API.
Testing the API #
The following test creates a thread pool and submits a task for execution.
void testSubmittingToAThreadPool() {
final int threadCount = Runtime.getRuntime().availableProcessors();
final ExecutorService pool = Executors.newFixedThreadPool(threadCount);
final Future<Integer> future = pool.submit(() -> 42);
try {
assertEquals(42, future.get());
} catch (InterruptedException | ExecutionException e) {}
pool.shutdown();
}
The number of available processors is queried
and then used to create a thread pool of fixed size.
The submit
method is used to pass an anonymous
instance of
Callable
to be executed by a thread in the pool.
The result of submit
is an instance of
Future
that can be used to check if the task is completed using isDone
and to query the task result using get
.
The get
method blocks until the task is completed
similar to the join
method for threads.
It throws an InterruptedException
if the thread
executing the task is interrupted
and an
ExecutionException
if the task throws an exception.
There is another variant of get
that accepts a time limit as argument
and throws a
TimeoutException
if the task does not finish before the given limit.
Thread pools can be terminated with the shutdown
method.
After calling shutdown
, the pool does not accept
more tasks to be submitted
(throwing a
RejectedExecutionException
instead)
but continues to execute running tasks.
There is a variant shutdownNow
that attempts
to stop executing tasks and returns a list of waiting tasks.
The next test demonstrates how to cancel futures.
void testCancellingAFuture() {
final ExecutorService pool = Executors.newSingleThreadExecutor();
final List<?> interruptions = new ArrayList<>();
final Future<?> future = pool.submit(() -> {
while (!Thread.interrupted()) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
interruptions.add(null);
return;
}
}
interruptions.add(null);
});
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {}
future.cancel(true);
assertTrue(future.isCancelled());
boolean cancelled = false;
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
} catch (CancellationException e) {
cancelled = true;
}
assertTrue(cancelled);
assertEquals(1, interruptions.size());
}
Just to demonstrate the use of a different executor,
we create a pool with a single thread.
This time we pass a task that does not return a result
as anonymous Runnable
instance to submit
.
After submitting the future,
we give the pool some time to start executing it
before calling cancel
on the returned future.
The cancel
method accepts a boolean argument.
If it is true
, the thread executing the task is interrupted.
Calling get
on a cancelled future
results in a
CancellationException.
Task: Write more tests #
Study the documentation for ExecutorService and Future. Write more tests that you find interesting. Alternatively, write tests documenting the behaviour of ScheduledExecutorServices. The class Executors has static methods to create them.
Rendering with a thread pool #
We are now ready to implement a renderer
that reuses threads when rendering an image
with more parts than available processors.
Here is the render
method
of the ThreadPoolRenderer
.
public boolean render(final Box box) {
final InterruptFlag self = new InterruptFlag();
return join(fork(ForkJoinPool.commonPool(), box, self), self);
}
We use ForkJoinPool.commonPool
to access a thread pool
that is configured to run many non-blocking tasks
optimized for the available processors.
We also instantiate an InterruptFlag
(discussed below)
and pass it to the fork
and join
methods.
Here is the implementation of fork
.
private List<Future<?>> fork(final ExecutorService pool, //
final Box box, final Interruptible origin) {
return box.split() //
.map(part -> (Runnable) () -> renderer.render(part, origin)) //
.map(pool::submit) //
.collect(Collectors.toList());
}
It returns a list of futures
corresponding to rendering tasks
using an underlying StreamRenderer
that is passed a reference to the interrupt flag
to be tested for interruptions.
The join
method waits for the tasks to complete
by calling get
on the corresponding futures.
private boolean join(final List<Future<?>> futures, //
final InterruptFlag self) {
return futures.stream().map(future -> {
try {
future.get();
return true;
} catch (InterruptedException | ExecutionException e) {
self.wasInterrupted(true);
return false;
}
}).allMatch(ok -> ok);
}
The result of join
signifies
whether the calling thread waited successfully for all tasks
without being interrupted.
If the calling thread is interrupted,
it changes the interrupt flag passed as second argument.
We use our own interrupt flag,
because the one associated with threads in Java might be reset,
which could lead to started threads not being able to notice the interruption.
The following picture has been created using the ThreadPoolRenderer
.
This time parts colored with the same hue have been rendererd by the same thread.
The picture has been created by zooming in on some part of the Mandelbrot fractal. Rendering is more costly when zoomed in, and bright parts take significantly longer to compute than dark parts. When observing the rendering process closely, we see that the bottom right part of the picture is completed last. In the end, only a few threads are occupied rendering the remaining parts of the image even if more processors would be available.