问题引入
2018 年,一个大型电商平台的秒杀系统在生产环境崩溃了。排查发现,系统使用了 Executors.newFixedThreadPool(200) 处理用户请求,但在秒杀高峰期,队列中积压了数十万个任务,内存耗尽导致 OOM。
问题的根源不在线程数,而在队列——newFixedThreadPool 底层用的是无界 LinkedBlockingQueue。当请求速度持续大于处理速度时,任务无限堆积,最终撑爆内存。
线程池是 Java 并发编程中使用频率最高的基础设施,也是参数配置最容易出错的组件。"核心线程数设多少合适""队列选有界还是无界""拒绝策略用哪个"——这些问题没有标准答案,只有对业务场景的深度理解。
理解线程池的关键,不是记住 "CPU 密集型用 N+1,IO 密集型用 2N" 这种教条,而是理解任务在线程池中的完整生命周期,以及每个参数在这个生命周期中的决策点。
核心概念
ThreadPoolExecutor 的七参数
线程池的构造方法有七个参数,每个参数控制任务生命周期的不同阶段:
java
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 非核心线程空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务等待队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
读图导引:绿色节点是优先路径——只要有空的核心线程位,就创建新线程处理任务,不放入队列。蓝色节点是缓冲路径——核心线程满后,任务进入队列等待。粉色节点是最后的防线——队列和线程数都满后,触发拒绝策略。
为什么要先创建线程再入队?
这是 ThreadPoolExecutor 的设计哲学:能立即执行就不排队。如果任务一到就先入队,核心线程可能处于空闲状态,而任务在队列中等待——这显然不合理。所以流程是:先填满核心线程,再入队,队列满后再扩容到最大线程。
四种拒绝策略
当线程数达到 maximumPoolSize 且队列满时,新任务触发拒绝策略:
| 策略 | 行为 | 适用场景 |
|---|---|---|
| AbortPolicy | 抛出 RejectedExecutionException |
默认策略,快速失败 |
| CallerRunsPolicy | 由提交任务的线程(调用者)自己执行 | 反压机制,降低提交速度 |
| DiscardPolicy | 静默丢弃任务 | 允许丢数据的场景 |
| DiscardOldestPolicy | 丢弃队列中最老的任务,然后重试提交 | 老任务价值低的场景 |
读图导引:粉色节点(AbortPolicy)是最激进的方式——直接告诉调用方"我处理不了"。蓝色节点(CallerRunsPolicy)是最温和的方式——让提交者自己执行,天然形成反压(调用方变慢,提交速度自然下降)。
CallerRunsPolicy 的反压原理:
java
// 当线程池满载时,提交任务的线程(主线程)自己执行 run()
// 这意味着主线程被"阻塞"了,无法继续提交新任务
// 天然形成流量控制——生产速度被迫降低
new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy()
);
面试官视角:问 "拒绝策略选哪个" 没有标准答案,但要能分析场景。AbortPolicy 适合"不允许丢数据且调用方能处理异常"的场景;CallerRunsPolicy 适合"需要自我保护的系统",因为反压效果天然;DiscardPolicy 只在"数据可丢失"时使用(如日志采集)。
线程的生命周期与回收
线程池中的线程分为两类:
- 核心线程:创建后一直存活(除非设置
allowCoreThreadTimeOut(true)) - 非核心线程:空闲超过
keepAliveTime后被回收
读图导引:蓝色节点是空闲等待——线程从队列 poll(keepAliveTime, TimeUnit) 取任务,超时后进入回收(粉色节点)。核心线程默认不超时,但可以通过 allowCoreThreadTimeOut(true) 让核心线程也能被回收。
Worker 的结构:
java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // Worker 持有的线程
Runnable firstTask; // 初始任务(可能为 null)
volatile long completedTasks; // 完成的任务数
// Worker 继承 AQS,用 AQS 的 state 表示锁状态
// state = 0: 空闲(可被回收)
// state = 1: 运行中(正在执行任务)
}
Worker 继承 AQS 是一个巧妙的设计——线程池用 AQS 的独占锁语义来保护 interrupt 操作:只有在线程空闲时(state=0)才能中断它,运行中(state=1)的中断被延迟到任务完成后。
原理分析
任务执行的完整路径
当一个 Runnable 被提交到线程池,它经历了什么?
java
public void execute(Runnable command) {
int c = ctl.get(); // ctl 是一个 AtomicInteger,高3位存运行状态,低29位存线程数
// 步骤1:线程数 < corePoolSize,创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return;
c = ctl.get();
}
// 步骤2:线程池运行中,尝试入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 二次检查:入队后线程池是否被关闭了
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 至少保证一个线程在处理队列
return;
}
// 步骤3:队列满,创建非核心线程
if (!addWorker(command, false))
reject(command); // 步骤4:达到 maximumPoolSize,拒绝
}
关键设计:
-
ctl 合并变量:用一个
AtomicInteger同时存储运行状态(高 3 位:RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED)和线程数(低 29 位)。这样修改状态和线程数时可以原子操作。 -
二次检查:任务入队后,再次检查线程池状态。如果入队的同时线程池被
shutdown(),需要从队列中移除任务并拒绝。这是一个经典的并发编程模式——"先操作再验证"。 -
addWorker 的第二个参数:
true表示创建核心线程(受corePoolSize限制),false表示创建非核心线程(受maximumPoolSize限制)。
ForkJoinPool 的工作窃取
ForkJoinPool 是 JDK7 引入的专用线程池,核心场景是分治算法(如归并排序、MapReduce)。它的核心创新是工作窃取(Work-Stealing):
读图导引:每个 Worker 线程维护自己的双端任务队列(Deque)。正常时从队列头部取任务执行(LIFO)。当自己的队列空时,从其他 Worker 队列的尾部窃取任务(FIFO)。窃取从尾部进行是为了减少与队列主人的竞争。
为什么工作窃取高效?
- 无全局队列:没有所有线程竞争的中心队列,消除了热点
- 本地优先:线程优先执行自己分解出来的子任务,缓存局部性好
- 负载均衡:空闲线程自动从忙碌线程窃取任务,无需中心调度
CompletableFuture 和 Parallel Stream 的默认线程池:
java
// CompletableFuture 默认使用 ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(() -> compute()); // common pool
// Parallel Stream 默认也使用 common pool
list.parallelStream().map(x -> transform(x)); // common pool
// ForkJoinPool.commonPool() 的线程数 = Runtime.availableProcessors() - 1
暗面:
ForkJoinPool.commonPool()的线程数默认是CPU 核数 - 1。如果所有 common pool 线程都被长时间阻塞(如 IO 操作),整个 common pool 就瘫痪了——其他用 common pool 的 CompletableFuture 和 Parallel Stream 都会饿死。解决方式是自定义线程池:CompletableFuture.supplyAsync(() -> ..., customExecutor)。
Executors 工厂方法的陷阱
Executors 提供了几个便捷的工厂方法,但在生产环境中几乎都是陷阱:
java
// 陷阱1:无界队列导致 OOM
ExecutorService pool = Executors.newFixedThreadPool(200);
// 底层:new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>())
// LinkedBlockingQueue 默认容量是 Integer.MAX_VALUE!
// 陷阱2:无限线程导致 OOM
ExecutorService pool = Executors.newCachedThreadPool();
// 底层:new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
// new SynchronousQueue<Runnable>())
// maximumPoolSize = Integer.MAX_VALUE,可以创建无限线程
// 陷阱3:单线程但无界队列
ExecutorService pool = Executors.newSingleThreadExecutor();
// 同样是 LinkedBlockingQueue(Integer.MAX_VALUE)
// 陷阱4:ScheduledThreadPool 允许核心线程无限增长
ExecutorService pool = Executors.newScheduledThreadPool(10);
// 底层 DelayedWorkQueue 是无界的
读图导引:粉色节点是 OOM 风险点。所有 Executors 的便捷工厂方法都使用了无界队列或无限线程,在"生产速度 > 消费速度"的场景下都会导致资源耗尽。
阿里巴巴 Java 开发手册的强制规定:
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
正确的线程池构造方式:
java
// 有界队列 + CallerRunsPolicy 反压
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, // corePoolSize
8, // maximumPoolSize
60L, TimeUnit.SECONDS, // keepAliveTime
new ArrayBlockingQueue<>(100), // 有界队列!
new ThreadFactory() { // 自定义线程名
private final AtomicInteger count = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, "biz-pool-" + count.incrementAndGet());
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 反压策略
);
实战/源码
动态调整线程池参数
ThreadPoolExecutor 提供了运行时调整参数的能力:
java
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
// 动态修改核心线程数
executor.setCorePoolSize(10);
// 动态修改最大线程数
executor.setMaximumPoolSize(20);
// 动态修改拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 预启动所有核心线程(避免任务提交时才创建线程的延迟)
executor.prestartAllCoreThreads();
优雅关闭线程池
java
// 步骤1:停止接受新任务
executor.shutdown();
// 步骤2:等待已有任务完成(超时后强制关闭)
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 步骤3:取消所有未执行的任务
List<Runnable> droppedTasks = executor.shutdownNow();
System.err.println("未执行的任务数:" + droppedTasks.size());
}
shutdown() vs shutdownNow():
shutdown():将线程池状态设为 SHUTDOWN,不再接受新任务,但等待队列中的任务会执行完shutdownNow():将状态设为 STOP,尝试中断所有正在执行的线程,返回队列中未执行的任务列表
监控线程池状态
java
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
// 任务总数
long taskCount = executor.getTaskCount();
// 已完成任务数
long completed = executor.getCompletedTaskCount();
// 当前活跃线程数
int active = executor.getActiveCount();
// 队列中的任务数
int queued = executor.getQueue().size();
// 最大同时存在的线程数
int largest = executor.getLargestPoolSize();
生产环境中建议将线程池的关键指标暴露到监控系统(如 Prometheus),设置告警规则:
- 队列积压任务数 > 阈值
- 活跃线程数 / maximumPoolSize > 80%
- 任务拒绝频率 > 0
ForkJoinPool 自定义并行度
java
// 避免阻塞 common pool
ForkJoinPool customPool = new ForkJoinPool(4);
try {
customPool.submit(() ->
list.parallelStream().map(this::blockingTransform).collect(Collectors.toList())
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customPool.shutdown();
}
常见问题
Q1:核心线程数怎么设置?
没有银弹,但有一些经验公式:
- CPU 密集型(计算为主):
corePoolSize = CPU 核数 + 1。+1 是为了在某个线程因页缺失或其他原因暂停时,额外的线程能充分利用 CPU。 - IO 密集型(网络/磁盘 IO 为主):
corePoolSize = CPU 核数 × 2或更大。因为线程等待 IO 时不占用 CPU,可以多开一些线程保持 CPU 忙碌。 - 混合场景:通过压测确定。可以参照 Little's Law:
并发度 = 吞吐量 × 平均响应时间。
Q2:为什么 ThreadPoolExecutor 要先创建线程再入队?
因为"立即执行"优于"排队等待"。如果任务一到就先入队,而核心线程处于空闲状态,任务的响应时间会无故增加。所以设计为:先尽量用线程处理,线程满后再缓冲到队列。
Q3:maximumPoolSize 怎么设?
取决于系统能承载的最大线程数。每个线程默认占用 1MB 栈空间(-Xss 参数),如果 maximumPoolSize = 1000,仅线程栈就占用约 1GB 内存。此外,线程切换也有开销。建议通过压测找到系统的吞吐量拐点。
Q4:CallerRunsPolicy 会导致主线程阻塞吗?
是的,这就是它的设计目的。当线程池满载时,提交任务的线程(通常是主线程或请求处理线程)自己执行 run(),这意味着它不能继续提交新任务——天然形成反压。但如果 Caller 是主线程且任务执行时间长,会导致整个应用"卡住"。
Q5:ForkJoinPool.commonPool() 线程数可以改吗?
可以通过 JVM 参数调整:
bash
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
默认是 Runtime.availableProcessors() - 1。
总结
线程池的设计是一场精密的资源调度博弈:
- 七参数模型:corePoolSize(核心产能)→ workQueue(缓冲容量)→ maximumPoolSize(最大产能)→ handler(过载保护)。任务按这个优先级路径被处理
- 拒绝策略:AbortPolicy 快速失败、CallerRunsPolicy 反压、DiscardPolicy 静默丢弃。CallerRunsPolicy 的"自我执行"机制是天然的流量控制
- 线程回收:非核心线程空闲
keepAliveTime后被回收;核心线程默认永不回收,但可通过allowCoreThreadTimeOut(true)改变 - ForkJoinPool 工作窃取:每个 Worker 维护双端队列,本地 LIFO 执行、空闲时从其他队列尾部 FIFO 窃取。消除了中心队列的热点,是并行计算的最优调度模型
- Executors 工厂方法的陷阱:全部使用无界队列或无限线程,生产环境必须用
ThreadPoolExecutor手动构造,指定有界队列和明确的拒绝策略
理解线程池的关键,是追问:当任务提交速度持续大于处理速度时,系统的行为是什么? 无界队列会 OOM,CallerRunsPolicy 会反压,AbortPolicy 会抛异常——没有完美的策略,只有对业务场景的适配。