问题引入

2018 年,一个大型电商平台的秒杀系统在生产环境崩溃了。排查发现,系统使用了 Executors.newFixedThreadPool(200) 处理用户请求,但在秒杀高峰期,队列中积压了数十万个任务,内存耗尽导致 OOM。

问题的根源不在线程数,而在队列——newFixedThreadPool 底层用的是无界 LinkedBlockingQueue。当请求速度持续大于处理速度时,任务无限堆积,最终撑爆内存。

线程池是 Java 并发编程中使用频率最高的基础设施,也是参数配置最容易出错的组件。"核心线程数设多少合适""队列选有界还是无界""拒绝策略用哪个"——这些问题没有标准答案,只有对业务场景的深度理解。

理解线程池的关键,不是记住 "CPU 密集型用 N+1,IO 密集型用 2N" 这种教条,而是理解任务在线程池中的完整生命周期,以及每个参数在这个生命周期中的决策点。

核心概念

ThreadPoolExecutor 的七参数

线程池的构造方法有七个参数,每个参数控制任务生命周期的不同阶段:

java 复制代码
public ThreadPoolExecutor(
    int corePoolSize,           // 核心线程数
    int maximumPoolSize,        // 最大线程数
    long keepAliveTime,         // 非核心线程空闲存活时间
    TimeUnit unit,              // 时间单位
    BlockingQueue<Runnable> workQueue,  // 任务等待队列
    ThreadFactory threadFactory,        // 线程工厂
    RejectedExecutionHandler handler    // 拒绝策略
)
graph TD subgraph "任务提交流程" T[任务提交] -->|核心线程数 < corePoolSize| C[创建核心线程] T -->|核心线程满| Q[进入 workQueue] Q -->|队列满| M[创建非核心线程] M -->|达到 maximumPoolSize| R[触发拒绝策略] end style C fill:#9f9,stroke:#333,stroke-width:2px style Q fill:#bbf,stroke:#333,stroke-width:2px style R fill:#f9f,stroke:#333,stroke-width:2px

读图导引:绿色节点是优先路径——只要有空的核心线程位,就创建新线程处理任务,不放入队列。蓝色节点是缓冲路径——核心线程满后,任务进入队列等待。粉色节点是最后的防线——队列和线程数都满后,触发拒绝策略。

为什么要先创建线程再入队?

这是 ThreadPoolExecutor 的设计哲学:能立即执行就不排队。如果任务一到就先入队,核心线程可能处于空闲状态,而任务在队列中等待——这显然不合理。所以流程是:先填满核心线程,再入队,队列满后再扩容到最大线程。

四种拒绝策略

当线程数达到 maximumPoolSize 且队列满时,新任务触发拒绝策略:

策略 行为 适用场景
AbortPolicy 抛出 RejectedExecutionException 默认策略,快速失败
CallerRunsPolicy 由提交任务的线程(调用者)自己执行 反压机制,降低提交速度
DiscardPolicy 静默丢弃任务 允许丢数据的场景
DiscardOldestPolicy 丢弃队列中最老的任务,然后重试提交 老任务价值低的场景
graph TD subgraph "拒绝策略对比" A[任务到达 线程池满载] -->|AbortPolicy| B[抛异常] A -->|CallerRunsPolicy| C[调用者线程执行] A -->|DiscardPolicy| D[静默丢弃] A -->|DiscardOldestPolicy| E[丢弃最老任务] end style B fill:#f9f,stroke:#333,stroke-width:2px style C fill:#bbf,stroke:#333,stroke-width:2px

读图导引:粉色节点(AbortPolicy)是最激进的方式——直接告诉调用方"我处理不了"。蓝色节点(CallerRunsPolicy)是最温和的方式——让提交者自己执行,天然形成反压(调用方变慢,提交速度自然下降)。

CallerRunsPolicy 的反压原理

java 复制代码
// 当线程池满载时,提交任务的线程(主线程)自己执行 run()
// 这意味着主线程被"阻塞"了,无法继续提交新任务
// 天然形成流量控制——生产速度被迫降低
new ThreadPoolExecutor(
    4, 8, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

面试官视角:问 "拒绝策略选哪个" 没有标准答案,但要能分析场景。AbortPolicy 适合"不允许丢数据且调用方能处理异常"的场景;CallerRunsPolicy 适合"需要自我保护的系统",因为反压效果天然;DiscardPolicy 只在"数据可丢失"时使用(如日志采集)。

线程的生命周期与回收

线程池中的线程分为两类:

  • 核心线程:创建后一直存活(除非设置 allowCoreThreadTimeOut(true)
  • 非核心线程:空闲超过 keepAliveTime 后被回收
graph TD subgraph "线程生命周期" NEW[创建 Worker] -->|运行任务| RUN[执行任务] RUN -->|任务完成| IDLE[从队列取任务] IDLE -->|获取到任务| RUN IDLE -->|超时 keepAliveTime| REC[线程回收] IDLE -->|线程池关闭| REC REC -->|allowCoreThreadTimeOut| CORE[核心线程也可回收] end style IDLE fill:#bbf,stroke:#333,stroke-width:2px style REC fill:#f9f,stroke:#333,stroke-width:2px

读图导引:蓝色节点是空闲等待——线程从队列 poll(keepAliveTime, TimeUnit) 取任务,超时后进入回收(粉色节点)。核心线程默认不超时,但可以通过 allowCoreThreadTimeOut(true) 让核心线程也能被回收。

Worker 的结构

java 复制代码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;      // Worker 持有的线程
    Runnable firstTask;       // 初始任务(可能为 null)
    volatile long completedTasks;  // 完成的任务数
    
    // Worker 继承 AQS,用 AQS 的 state 表示锁状态
    // state = 0: 空闲(可被回收)
    // state = 1: 运行中(正在执行任务)
}

Worker 继承 AQS 是一个巧妙的设计——线程池用 AQS 的独占锁语义来保护 interrupt 操作:只有在线程空闲时(state=0)才能中断它,运行中(state=1)的中断被延迟到任务完成后。

原理分析

任务执行的完整路径

当一个 Runnable 被提交到线程池,它经历了什么?

java 复制代码
public void execute(Runnable command) {
    int c = ctl.get();  // ctl 是一个 AtomicInteger,高3位存运行状态,低29位存线程数
    
    // 步骤1:线程数 < corePoolSize,创建核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) return;
        c = ctl.get();
    }
    
    // 步骤2:线程池运行中,尝试入队
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 二次检查:入队后线程池是否被关闭了
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);  // 至少保证一个线程在处理队列
        return;
    }
    
    // 步骤3:队列满,创建非核心线程
    if (!addWorker(command, false))
        reject(command);  // 步骤4:达到 maximumPoolSize,拒绝
}

关键设计

  1. ctl 合并变量:用一个 AtomicInteger 同时存储运行状态(高 3 位:RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED)和线程数(低 29 位)。这样修改状态和线程数时可以原子操作。

  2. 二次检查:任务入队后,再次检查线程池状态。如果入队的同时线程池被 shutdown(),需要从队列中移除任务并拒绝。这是一个经典的并发编程模式——"先操作再验证"。

  3. addWorker 的第二个参数true 表示创建核心线程(受 corePoolSize 限制),false 表示创建非核心线程(受 maximumPoolSize 限制)。

ForkJoinPool 的工作窃取

ForkJoinPool 是 JDK7 引入的专用线程池,核心场景是分治算法(如归并排序、MapReduce)。它的核心创新是工作窃取(Work-Stealing)

graph TD subgraph "工作窃取算法" W1[Worker1 双端队列] -->|本地任务| W1T[头部取任务] W2[Worker2 双端队列] -->|本地任务| W2T[头部取任务] W3[Worker3 双端队列] -->|本地为空| W3S[从其他队列尾部窃取] W3S -.->|窃取| W2 end style W3S fill:#f9f,stroke:#333,stroke-width:2px

读图导引:每个 Worker 线程维护自己的双端任务队列(Deque)。正常时从队列头部取任务执行(LIFO)。当自己的队列空时,从其他 Worker 队列的尾部窃取任务(FIFO)。窃取从尾部进行是为了减少与队列主人的竞争。

为什么工作窃取高效?

  • 无全局队列:没有所有线程竞争的中心队列,消除了热点
  • 本地优先:线程优先执行自己分解出来的子任务,缓存局部性好
  • 负载均衡:空闲线程自动从忙碌线程窃取任务,无需中心调度

CompletableFuture 和 Parallel Stream 的默认线程池

java 复制代码
// CompletableFuture 默认使用 ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(() -> compute());  // common pool

// Parallel Stream 默认也使用 common pool
list.parallelStream().map(x -> transform(x));     // common pool

// ForkJoinPool.commonPool() 的线程数 = Runtime.availableProcessors() - 1

暗面ForkJoinPool.commonPool() 的线程数默认是 CPU 核数 - 1。如果所有 common pool 线程都被长时间阻塞(如 IO 操作),整个 common pool 就瘫痪了——其他用 common pool 的 CompletableFuture 和 Parallel Stream 都会饿死。解决方式是自定义线程池:CompletableFuture.supplyAsync(() -> ..., customExecutor)

Executors 工厂方法的陷阱

Executors 提供了几个便捷的工厂方法,但在生产环境中几乎都是陷阱:

java 复制代码
// 陷阱1:无界队列导致 OOM
ExecutorService pool = Executors.newFixedThreadPool(200);
// 底层:new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS,
//                              new LinkedBlockingQueue<Runnable>())
// LinkedBlockingQueue 默认容量是 Integer.MAX_VALUE!

// 陷阱2:无限线程导致 OOM
ExecutorService pool = Executors.newCachedThreadPool();
// 底层:new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
//                              new SynchronousQueue<Runnable>())
// maximumPoolSize = Integer.MAX_VALUE,可以创建无限线程

// 陷阱3:单线程但无界队列
ExecutorService pool = Executors.newSingleThreadExecutor();
// 同样是 LinkedBlockingQueue(Integer.MAX_VALUE)

// 陷阱4:ScheduledThreadPool 允许核心线程无限增长
ExecutorService pool = Executors.newScheduledThreadPool(10);
// 底层 DelayedWorkQueue 是无界的
graph TD subgraph "Executors 工厂方法陷阱" F1[newFixedThreadPool] -->|无界队列| O1[OOM] F2[newCachedThreadPool] -->|无限线程| O2[OOM] F3[newSingleThreadExecutor] -->|无界队列| O3[OOM] end style O1 fill:#f9f,stroke:#333,stroke-width:2px style O2 fill:#f9f,stroke:#333,stroke-width:2px style O3 fill:#f9f,stroke:#333,stroke-width:2px

读图导引:粉色节点是 OOM 风险点。所有 Executors 的便捷工厂方法都使用了无界队列或无限线程,在"生产速度 > 消费速度"的场景下都会导致资源耗尽。

阿里巴巴 Java 开发手册的强制规定

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

正确的线程池构造方式

java 复制代码
// 有界队列 + CallerRunsPolicy 反压
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    4,                      // corePoolSize
    8,                      // maximumPoolSize
    60L, TimeUnit.SECONDS,  // keepAliveTime
    new ArrayBlockingQueue<>(100),  // 有界队列!
    new ThreadFactory() {           // 自定义线程名
        private final AtomicInteger count = new AtomicInteger(0);
        public Thread newThread(Runnable r) {
            return new Thread(r, "biz-pool-" + count.incrementAndGet());
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy()  // 反压策略
);

实战/源码

动态调整线程池参数

ThreadPoolExecutor 提供了运行时调整参数的能力:

java 复制代码
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);

// 动态修改核心线程数
executor.setCorePoolSize(10);

// 动态修改最大线程数
executor.setMaximumPoolSize(20);

// 动态修改拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

// 预启动所有核心线程(避免任务提交时才创建线程的延迟)
executor.prestartAllCoreThreads();

优雅关闭线程池

java 复制代码
// 步骤1:停止接受新任务
executor.shutdown();

// 步骤2:等待已有任务完成(超时后强制关闭)
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
    // 步骤3:取消所有未执行的任务
    List<Runnable> droppedTasks = executor.shutdownNow();
    System.err.println("未执行的任务数:" + droppedTasks.size());
}

shutdown() vs shutdownNow()

  • shutdown():将线程池状态设为 SHUTDOWN,不再接受新任务,但等待队列中的任务会执行完
  • shutdownNow():将状态设为 STOP,尝试中断所有正在执行的线程,返回队列中未执行的任务列表

监控线程池状态

java 复制代码
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);

// 任务总数
long taskCount = executor.getTaskCount();

// 已完成任务数
long completed = executor.getCompletedTaskCount();

// 当前活跃线程数
int active = executor.getActiveCount();

// 队列中的任务数
int queued = executor.getQueue().size();

// 最大同时存在的线程数
int largest = executor.getLargestPoolSize();

生产环境中建议将线程池的关键指标暴露到监控系统(如 Prometheus),设置告警规则:

  • 队列积压任务数 > 阈值
  • 活跃线程数 / maximumPoolSize > 80%
  • 任务拒绝频率 > 0

ForkJoinPool 自定义并行度

java 复制代码
// 避免阻塞 common pool
ForkJoinPool customPool = new ForkJoinPool(4);
try {
    customPool.submit(() ->
        list.parallelStream().map(this::blockingTransform).collect(Collectors.toList())
    ).get();
} catch (Exception e) {
    e.printStackTrace();
} finally {
    customPool.shutdown();
}

常见问题

Q1:核心线程数怎么设置?

没有银弹,但有一些经验公式:

  • CPU 密集型(计算为主):corePoolSize = CPU 核数 + 1。+1 是为了在某个线程因页缺失或其他原因暂停时,额外的线程能充分利用 CPU。
  • IO 密集型(网络/磁盘 IO 为主):corePoolSize = CPU 核数 × 2 或更大。因为线程等待 IO 时不占用 CPU,可以多开一些线程保持 CPU 忙碌。
  • 混合场景:通过压测确定。可以参照 Little's Law:并发度 = 吞吐量 × 平均响应时间

Q2:为什么 ThreadPoolExecutor 要先创建线程再入队?

因为"立即执行"优于"排队等待"。如果任务一到就先入队,而核心线程处于空闲状态,任务的响应时间会无故增加。所以设计为:先尽量用线程处理,线程满后再缓冲到队列。

Q3:maximumPoolSize 怎么设?

取决于系统能承载的最大线程数。每个线程默认占用 1MB 栈空间(-Xss 参数),如果 maximumPoolSize = 1000,仅线程栈就占用约 1GB 内存。此外,线程切换也有开销。建议通过压测找到系统的吞吐量拐点。

Q4:CallerRunsPolicy 会导致主线程阻塞吗?

是的,这就是它的设计目的。当线程池满载时,提交任务的线程(通常是主线程或请求处理线程)自己执行 run(),这意味着它不能继续提交新任务——天然形成反压。但如果 Caller 是主线程且任务执行时间长,会导致整个应用"卡住"。

Q5:ForkJoinPool.commonPool() 线程数可以改吗?

可以通过 JVM 参数调整:

bash 复制代码
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8

默认是 Runtime.availableProcessors() - 1

总结

线程池的设计是一场精密的资源调度博弈:

  • 七参数模型:corePoolSize(核心产能)→ workQueue(缓冲容量)→ maximumPoolSize(最大产能)→ handler(过载保护)。任务按这个优先级路径被处理
  • 拒绝策略:AbortPolicy 快速失败、CallerRunsPolicy 反压、DiscardPolicy 静默丢弃。CallerRunsPolicy 的"自我执行"机制是天然的流量控制
  • 线程回收:非核心线程空闲 keepAliveTime 后被回收;核心线程默认永不回收,但可通过 allowCoreThreadTimeOut(true) 改变
  • ForkJoinPool 工作窃取:每个 Worker 维护双端队列,本地 LIFO 执行、空闲时从其他队列尾部 FIFO 窃取。消除了中心队列的热点,是并行计算的最优调度模型
  • Executors 工厂方法的陷阱:全部使用无界队列或无限线程,生产环境必须用 ThreadPoolExecutor 手动构造,指定有界队列和明确的拒绝策略

理解线程池的关键,是追问:当任务提交速度持续大于处理速度时,系统的行为是什么? 无界队列会 OOM,CallerRunsPolicy 会反压,AbortPolicy 会抛异常——没有完美的策略,只有对业务场景的适配。