[TOC]
ThreadPoolExecutor 源码阅读 读了一下 ThreadPoolExecutor 的源码(JDK 11), 简单的做个笔记.
Executor 框架 Executor Executor
接口只有一个方法:
1 2 3 public interface Executor { void execute (Runnable command) ; }
Executor
接口提供了一种将任务提交和任务执行机制解耦的方法. Executor
的实现并不须要是异步的.
ExecutorService ExecutorService
在 Executor
的基础上, 提供了一些管理终止的方法和可以生成 Future
来跟踪一个或多个异步任务的进度的方法:
shutdown()
方法会启动比较柔和的关闭过程, 并且不会阻塞. ExecutorService
将会继续执行已经提交的任务, 但不会再接受新的任务. 如果 ExecutorService
已经被关闭, 则不会有附加的操作.
shutdownNow()
方法会尝试停止正在执行的任务, 不再执行等待执行的任务, 并且返回等待执行的任务列表, 不会阻塞. 这个方法只能尝试停止任务, 典型的取消实现是通过中断来取消任务, 因此不能响应中断的任务可能永远不会终止.
invokeAll()
方法执行给定集合中的所有任务, 当所有任务完成时返回 Future
的列表, 支持中断. 如果在此操作正在进行时修改了给定的集合,则此方法的结果未定义.
invokeAny()
方法会执行给定集合中的任务, 当有一个任务完成时, 返回这个任务的结果, 并取消其他未完成的任务, 支持中断. 如果在此操作正在进行时修改了给定的集合,则此方法的结果未定义.
AbstractExecutorService AbstractExecutorService
提供了一些 ExecutorService
的执行方法的默认实现. 这个方法使用了 newTaskFor()
方法返回的 RunnableFuture
(默认是 FutureTask
) 来实现 submit()
、invokeAll()
、 invokeAny()
方法.
RunnableFuture
继承了 Runnable
和 Future
, 在 run()
方法成功执行后, 将会设置完成状态, 并允许获取执行的结果:
1 2 3 4 5 6 7 public interface RunnableFuture <V> extends Runnable , Future<V> { void run () ; }
FutureTask FutureTask
实现了 RunnableFuture
接口, 表示一个可取消的计算任务, 只能在任务完成之后获取结果, 并且在任务完成后, 就不再能取消或重启, 除非使用 runAndReset()
方法.
FutureTask
有 7 个状态:
NEW
COMPLETING
NORMAL
EXCEPTIONAL
CANCELLED
INTERRUPTING
INTERRUPTED
可能的状态转换:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
FutureTask
在更新 state 、 runner、 waiters 时, 都使用了 VarHandle.compareAndSet()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private static final VarHandle STATE;private static final VarHandle RUNNER;private static final VarHandle WAITERS;static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(FutureTask.class, "state" , int .class); RUNNER = l.findVarHandle(FutureTask.class, "runner" , Thread.class); WAITERS = l.findVarHandle(FutureTask.class, "waiters" , WaitNode.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError (e); } Class<?> ensureLoaded = LockSupport.class; } protected void set (V v) { if (STATE.compareAndSet(this , NEW, COMPLETING)) { outcome = v; STATE.setRelease(this , NORMAL); finishCompletion(); } }
来看一下 get()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null ) throw new NullPointerException (); int s = state; if (s <= COMPLETING && (s = awaitDone(true , unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException (); return report(s); } private int awaitDone (boolean timed, long nanos) throws InterruptedException { long startTime = 0L ; WaitNode q = null ; boolean queued = false ; for (;;) { int s = state; if (s > COMPLETING) { if (q != null ) q.thread = null ; return s; } else if (s == COMPLETING) Thread.yield(); else if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException (); } else if (q == null ) { if (timed && nanos <= 0L ) return s; q = new WaitNode (); } else if (!queued) queued = WAITERS.weakCompareAndSet(this , q.next = waiters, q); else if (timed) { final long parkNanos; if (startTime == 0L ) { startTime = System.nanoTime(); if (startTime == 0L ) startTime = 1L ; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } if (state < COMPLETING) LockSupport.parkNanos(this , parkNanos); } else LockSupport.park(this ); } }
再来看下 run()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public void run () { if (state != NEW || !RUNNER.compareAndSet(this , null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set (V v) { if (STATE.compareAndSet(this , NEW, COMPLETING)) { outcome = v; STATE.setRelease(this , NORMAL); finishCompletion(); } } private void finishCompletion () { for (WaitNode q; (q = waiters) != null ;) { if (WAITERS.weakCompareAndSet(this , q, null )) { for (;;) { Thread t = q.thread; if (t != null ) { q.thread = null ; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null ) break ; q.next = null ; q = next; } break ; } } done(); callable = null ; }
AbstractExecutorService 的执行方法 来看下 AbstractExecutorService
实现的几个执行方法, 这里就只放上以 Callable
为参数的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new FutureTask <T>(callable); } public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } public <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false , 0 ); } catch (TimeoutException cannotHappen) { assert false ; return null ; } } private <T> T doInvokeAny (Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null ) throw new NullPointerException (); int ntasks = tasks.size(); if (ntasks == 0 ) throw new IllegalArgumentException (); ArrayList<Future<T>> futures = new ArrayList <>(ntasks); ExecutorCompletionService<T> ecs = new ExecutorCompletionService <T>(this ); try { ExecutionException ee = null ; final long deadline = timed ? System.nanoTime() + nanos : 0L ; Iterator<? extends Callable <T>> it = tasks.iterator(); futures.add(ecs.submit(it.next())); --ntasks; int active = 1 ; for (;;) { Future<T> f = ecs.poll(); if (f == null ) { if (ntasks > 0 ) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } else if (active == 0 ) break ; else if (timed) { f = ecs.poll(nanos, NANOSECONDS); if (f == null ) throw new TimeoutException (); nanos = deadline - System.nanoTime(); } else f = ecs.take(); } if (f != null ) { --active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException (rex); } } } if (ee == null ) ee = new ExecutionException (); throw ee; } finally { cancelAll(futures); } } public <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null ) throw new NullPointerException (); ArrayList<Future<T>> futures = new ArrayList <>(tasks.size()); try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0 , size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { f.get(); } catch (CancellationException | ExecutionException ignore) {} } } return futures; } catch (Throwable t) { cancelAll(futures); throw t; } }
构造器 ThreadPoolExecutor
一共有4个构造器, 这里就只放上两个构造器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
参数说明:
corePoolSize: 在线程池中保持的线程的数量, 即使这些线程是空闲的, 除非 allowCoreThreadTimeOut
被设置为 true
;
maximumPoolSize: 线程池中最大线程数量;
keepAliveTime: 多余空闲线程在终止之前等待新任务的最长时间;
unit: keepAliveTime
的时间单位;
workQueue: 任务的等待队列, 用于存放等待执行的任务. 仅包含 execute()
方法提交的 Runnable
;
threadFactory: executor 用来创建线程的工厂, 默认使用 Executors.defaultThreadFactory()
来创建一个新的工厂;
handler: 任务因为达到了线程边界和队列容量而被阻止时的处理程序, 默认使用 AbortPolicy
.
状态 ThreadPoolExecutor
有5个状态:
RUNNING: 接受新任务, 并且处理队列中的任务;
SHUTDOWN: 不接受新任务, 但是处理队列中的任务, 此时仍然可能创建新的线程;
STOP: 不接受新任务, 处理队列中的任务, 中断正在运行的任务;
TIDYING: 所有的任务都终结了, workCount 的值是0, 将状态转换为 TIDYING 的线程会执行 terminated()
方法;
TERMINATED: terminated()
方法执行完毕.
状态转换:
RUNNING -> SHUTDOWN , On invocation of shutdown()
(RUNNING or SHUTDOWN) -> STOP , On invocation of shutdownNow()
SHUTDOWN -> TIDYING , When both queue and pool are empty
STOP -> TIDYING , When pool is empty
TIDYING -> TERMINATED , When the terminated() hook method has completed
workCount 和 state 被打包在一个 AtomicInteger
中, 其中的高三位用于表示线程池状态( state ), 低 29 位用于表示 workCount:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int runStateOf (int c) { return c & ~COUNT_MASK; }private static int workerCountOf (int c) { return c & COUNT_MASK; }private static int ctlOf (int rs, int wc) { return rs | wc; }
workCount 表示有效的线程数量, 是允许启动且不允许停止的 worker 的数量, 与实际的线程数量瞬时不同. 用户可见的线程池大小是 Worker 集合的大小.
Worker 与任务调度 工作线程被封装在 Worker
中 , 并且存放在一个 HashSet
(workers) 中由 mainLock 保护:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private final HashSet<Worker> workers = new HashSet <>();private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } ... }
Worker.run()
方法很简单, 直接调用了 runWorker()
方法, 来看一下这个方法的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null ); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
getTask()
方法会在以下4种情况返回 null :
workCount 大于 maximumPoolSize;
线程池已经处于 STOP 状态;
线程池已经处于 SHUTDOWN 状态, 并且任务队列为空;
等待任务时超时, 并且超时的 worker 需要被终止.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
processWorkerExit()
方法负责垂死 worker 的清理和簿记, 只会被工作线程调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
提交任务 ThreadPoolExecutor
没有重写 submit()
方法, 我们只要看一下 execute()
就够了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
addWorker()
方法有两个参数 Runnable firstTask
和 boolean core
. firstTask
是新建的工作线程的第一个任务; core
如果为 true , 表示用 corePoolSize 作为边界条件, 否则表示用 maximumPoolSize. 这里的 core 用布尔值是为了确保检查最新的状态.
addWorker()
主要做了这么两件事情:
是否可以在当前线程池状态和给定的边界条件(core or maximum)下创建一个新的工作线程;
如果可以, 调整 worker counter, 如果可能的话, 创建一个新的 worker 并启动它, 把 firstTask 作为这个新 worker 的第一个任务;
来看下 addWorker()
方法的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false ; for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN)) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
线程池关闭 来看一下线程池的关闭方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } public boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { while (runStateLessThan(ctl.get(), TERMINATED)) { if (nanos <= 0L ) return false ; nanos = termination.awaitNanos(nanos); } return true ; } finally { mainLock.unlock(); } }
tryTerminate()
方法中, 如果成功将线程池状态转换到了 TERMINATED, 将会termination.signalAll()
来唤醒等待线程池终结的线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }