沉下心,才会远离烦恼。
java
提供了多线程,用户只要继承Thread
类或实现Runnable
接口就能轻松达到多线程的目的。简单的应用时,我们硬编码固定的线程数可能就能满足要求,但是涉及到线程资源的重复利用、管理、响应性能等,我们就需要线程池来协助了。类似数据库连接池,线程池主要有以下优点:
- 创建线程也需要消耗,池中线程可重复利用,降低资源消耗
- 线程提前创建,提高响应速度
- 提高线程可管理性
Java 1.5
中引入了Executor
框架把任务的提交和执行进行了解耦。只需要定义好任务,然后提交给线程池。而不用关心任务如何被执行、被哪个线程执行、以及什么时候执行等。
Executor
接口:1
2
3public interface Executor {
void execute(Runnable command);
}
Executor
只是一个简单的接口,但它为灵活而强大的框架创造了基础,Executor
基于 生产者-消费者模式。如果你在程序中实现一个生产者-消费者的设计,使用Executor
通常是最简单的方式。
Demo 1
1 | public class ExecutorCase { |
ThreadPoolExecutor
Executors
是java线程池的工厂类,通过它可以快速初始化一个符合业务需求的线程池,如Executors.newFixedThreadPool
方法产生一个拥有固定数量的线程池。1
2
3
4
5public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
其中ExecutorService
接口继承接口Executor
,方法内本质是通过不同参数初始化ThreadPoolExecutor
,下面看下这个方法是怎么定义的:1
2
3
4
5
6
7public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}
最准确的注释,你还可以查看jdk
源码中的英文注释。
corePoolSize
要保存在池中的线程数,包括空闲的。除非allowCoreThreadTimeOut
参数被设置。如果执行了prestartAllCoreThreads()
方法,将提前创建并启动所有核心线程。
maximumPoolSize
线程池允许的最大线程数,超出的提交将进入BlockingQueue
阻塞队列,故executor.execute(xxTask)
之后的代码不会因线程数量的限定而阻塞。
keepAliveTime
线程的空闲存活时间。该参数只在线程数大于核心线程数时起作用,结合corePoolSize
的注释理解。
unit
keepAliveTime
的单位。
workQueue
保存任务的阻塞队列,限定了队列中只能存储实现了Runnable
接口的任务。BlockingQueue<Runnable>
接口在JDK
中有以下实现:
ArrayBlockingQueue
: 基于数组结构的有界阻塞队列。LinkedBlockingQueue
: 基于链表机构的阻塞队列。SynchronusQueue
: 一个不存储元素的阻塞队列,每个插入操作必须等另一个线程调用移除操作,否则插入一直处于阻塞状态。PriorityBlockingQueue
: 具有优先级的无界阻塞队列。
前两者的味道类似于ArrayList
与LinkedList
,主要是具有数据结构Array
、链表
的特点。而SynchronusQueue
则类似于CSP
场景中,一个没有buffer
缓冲的channel
,《七周七并发模型》中一书中的CSP
模型中提到新手往往会认为有缓存的channel会比无缓存的channel应用更广泛,但实际情况却恰恰相反。
,虽然这不一定对,但是这提醒了我们一定要根据场景去选择使用。PriorityBlockingQueue
则是更接近场景需求优先级的解决办法。
threadFactory
创建线程的工厂,具有名称前缀pool-
,主要实现如下:1
2
3
4
5
6
7
8DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
handler
任务队列达到限制的饱和处理策略。线程池提供了4中策略:
AbortPolicy
: 直接抛出异常,默认策略CallerRunsPolicy
: 用调用者所在线程来执行任务DiscardOldesPolicy
: 丢弃队列最前面的任务,执行新的任务。类似于CSP
模型中的sliding
方式DiscardPolicy
: 直接丢弃任务。类似于CSP
模型中的dropping
方式
如果以上都不满足你的需求,你还可以自己实现RejectedExecutionHandler
接口,自定义饱和处理策略,比如日志记录、邮件提醒等。
Executors
Executors
工厂类提供了线程的初始化接口,主要有如下几种:
newFixedThreadPool
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
功能如其名,入参只有一个数字。指定固定的线程个数,其中 corePoolSize == maximumPoolSize
,0L
代表不会释放core
线程,使用LinkedBlocingQueue
作为任务队列。
newCachedThreadPool
1 | public static ExecutorService newCachedThreadPool() { |
初始化一个缓存限定时间线程的线程池,默认缓存60s,线程空闲超过60s时会自动释放线程,不会保留core
线程。
newSingleThreadExecutor
1 | public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { |
创建单个工作线程的Executor
,等同于newFixedThreadPool(1, threadFactory)
,返回的Executor
不可再重新配置。
newScheduledThreadPool
1 | public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { |
创建的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期同步数据。
newWorkStealingPool
1 | public static ExecutorService newWorkStealingPool() { |
jdk 1.8
中出现,创建一个work-stealing
的线程池,内部ForkJoinPool
使用一个并行因子来创建,默认为主机CPU的可用核心数。
实现原理
可以从方法内部的实例化代码看出,前三者都是ThreadPoolExecutor
类实现的,newScheduledThreadPool
返回类型都发生了变化,其实现是ScheduledThreadPoolExecutor
,另外newWorkStealingPool
返回值没有变化,说明暴露给外部的使用上没有变,内部使用ForkJoinPool
来做了优化。
ThreadPoolExecutor 线程池内部状态
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // ctl 包含了两个概念,因为两个的关联关系,巧妙的组合在一起;高3位表示线程池状态; 低29位 表示workerCount |
RUNNING
: 线程池会接收新任务,并处理排队的任务SHUTDOWN
: 线程池不接收新任务,但会处理队列中的任务STOP
: 线程池不接收新人无,不处理队列中的任务,并中断正在运行的任务TIDYING
: 所有任务已经终止,workCount为零,线程过渡到TIDYING状态TERMINATED
: terminated() 钩子方法运行完毕
任务提交
线程池框架提供了两种方式提交任务:
Executor.execute(Runnable command)
返回void, 不关心返回值1
void execute(Runnable command);
ExecutorService.submit(Callable<T> task)
返回Future<T>
1
<T> Future<T> submit(Callable<T> task)
ThreadPoolExecutor.execute 的实现
1 | int c = ctl.get(); // 获取原子变量的值 |
addWorker的实现
addWorker
方法主要是创建线程,执行任务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private boolean addWorker(Runnable firstTask, boolean core) {
retry: // 重试标记层级
for (;;) { // 无限定条件的for
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; // 线程池状态不满足则直接返回 false
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || // 如果 workerCount 大于 容量
wc >= (core ? corePoolSize : maximumPoolSize))
// 如果 workerCount 大于 核心线程数(外部以核心线程数作为判断依据时) 或 workerCount 大于 最大线程数(外部以最大线程数作为判断依据时)
return false; // 容量受限,返回false
if (compareAndIncrementWorkerCount(c)) // 通过上面的检测,并更新数值成功
break retry; // 跳出 多层循环,往方法的下半部分继续执行
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 上面未设值成功,状态 没有变化
continue retry; // 继续外部循环
// else CAS failed due to workerCount change; retry inner loop
}
}
以上是这个方法的前半部分,主要是线程池状态检测、线程池数量限制检测、线程池相关数量与状态的更新。以下是下半部分代码,主要是创建线程,执行任务:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 线程池的工作 通过 Worker 类,Worker 类继承了 AQS (AbstractQueuedSynchronizer)
final Thread t = w.thread; // 线程是取的 worker中的线程,而worker中的线程是线程池初始化的 线程工厂创建的
if (t != null) {
final ReentrantLock mainLock = this.mainLock; // ReentrantLock 锁的保证下,插入到 workers(HashSet结构)中
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 加入 hashSet 中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 添加成功 开始 运行
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
Worker
1 | Worker(Runnable firstTask) { // |
runWorker实现
1 | final void runWorker(Worker w) { |
getTask 实现
1 | private Runnable getTask() { |
Future 和 Callable 的实现
Demo 2
1 | public class ExecutorCase2 { |
在实际业务场景中,Future
与Callable
一般是成对出现的,Callable
负责执行任务产生结果,Future
则是负责获取结果
Callable
接口类似Runnable
接口,只是Runnable
没有返回值。所以如果你关心你每个任务的执行返回结果,就可以采用Callable
,否则你就直接使用Runnable
就好了。Callable
执行的任务如果发生异常,该异常也会被返回,即Future
可以拿到异步执行任务的各种结果。Future.get
方法是阻塞的,直到Callable
任务执行完成
submit实现
1 | public <T> Future<T> submit(Callable<T> task) { |
Callable
任务通过submit()
方法被封装为一个RunnableFuture
的FutureTask
.
FutureTask
1 | /* |
- state 存储
FutureTask
的状态, - 构造初始状态为
NEW
,构造函数使用callable
成员变量存储了入参callable
任务 FutureTask
实现了Runnable
接口,最终实际执行的是FutureTask
中的run
方法
FutureTask.get实现
1 | public V get() throws InterruptedException, ExecutionException { |
内部通过awaitDone
方法阻塞,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
- 如果线程被中断,则抛出异常
- 如果状态大于
COMPLETING
说明已经完成,直接返回状态即可 - 如果状态等于
COMPLETING
说明已经完成,使用yield
让渡一下cpu
,state
则会过度到NORMAL
了 - 通过
WaitNode
简单链表封装当前线程,并通过UNSAFE
添加到waiters
链表 - 最终通过
LockSupport
的park
或parkNanos
来挂起线程,另外finishCompletion
方法中会unpark
FutureTask.run 实现
1 | public void run() { |
run
方法是线程池中的线程来执行的,而非主线程- 执行
callable.call
方法来运行任务 call
通过时用set
方法来保存结果call
出现异常时用setException
方法来保持异常信息
set/setException
1 | protected void set(V v) { |
通过UNSAFE
修改了FutureTask
的状态,最终都通过调用finishCompletion
方法通知主线程任务完成。
finishCompletion
1 | private void finishCompletion() { |
- 更新
waiters
的值 LockSupport.unpark(t)
唤醒主线程
主要参考:
- 占小狼-深入分析java线程池的实现原理
- 《JAVA并发编程实践》