Java - How does ThreadPoolExecutor add new threads

Subscribe Send me a message home page tags


In this post, we will take a look at the implementation of ThreadPoolExecutor and see what happens when ThreadPoolExecutor receives a new task.

Related Reading

Implementation Detail

State Variable

The first section of ThreadPoolExecutor class defines a couple of constants and utility methods. ctl is a control variable which tracks the state of the ThreadPoolExecutor. The state of a ThreadPoolExecutor consists of

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

Execute a task/command

Now let's take a look at how ThreadPoolExecutor executes a task.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

The first if block is an input validation. Then at line 4, we get the state of the ThreadPoolExecutor. The code at line 5-9 handles the scenario where the number of workers is less than the core pool size. Recall that the core pool size is the minimum number of threads that we want to maintain. Note that if the number of workers is less than the core pool size, the execution will be transferred to the addWorker method and it will take care of the execution of command. Another observation: the first argument of addWorker method is called firstTask, which indicates that the command passed to the addWorker method is the first task of the newly created worker.

Line 10 - 19 is the most interesting part because this is the logic that adds new threads to the pool. The addWorker is used at line 15 and line 17. The usage at line 15 is probably not a happy-path usage because in order to execute line 15 we should have zero workers. It follows that with a running ThreadPoolExecutor, workQueue.offer(command) determines if the ThreadPoolExecutor will try to add a new thread.

According to the documentation of BlockingQueue:

boolean offer(E e)

Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and false if no space is currently available. When using a capacity-restricted queue, this method is generally preferable to add(E), which can fail to insert an element only by throwing an exception.

Notice that the decision if we want to try to add new thread is made by workQueue so the core pool size and maximum pool size does not play any roles here. The check of number of workers against the maximumPoolSize occurs inside addWorker method:

1
2
3
4
// This is part of addWorker method.
if (workerCountOf(c)
    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
    return false;

If we reach the maximum pool size, addWorker returns false then line 18 of execute method is executed and the command is rejected. We could implement a RejectedExecutionHandler to process rejected tasks/commands. The default RejectedExecutionHandler is AbortPolicy, which throws a RejectedExecutionException.

This is important. As mentioned in the documentation of ThreadPoolExecutor:

Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed.

Another example is Executors.newCachedThreadPool. This method returns a cached thread pool which will dynamically add new threads. The implementation of newCachedThreadPool uses SynchronousQueue and not LinkedBlockingQueue.

1
2
3
4
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

In short, if LinkedBlockingQueue is used then the maximumPoolSize is ignored. You may find more details of this topic at ExecutorService Gotcha.

----- END -----

Welcome to join reddit self-learning community.
Send me a message Subscribe to blog updates

Want some fun stuff?

/static/shopping_demo.png