问题引入

2015 年,某金融系统的交易接口从同步模型迁移到异步模型。开发者用 FutureTask 包装了所有的下游 RPC 调用,期望通过并行调用降低延迟。但很快发现问题:FutureTask.get() 是阻塞的,主线程需要等所有 Future 完成才能继续处理。所谓的"异步"只是并行,不是真正的非阻塞。

更致命的是异常处理——当一个 Future 抛出异常时,其他并行的 Future 结果已经不可用了,但 FutureTask 没有提供"异常时回退到默认值"的机制。开发者被迫写大量的 try-catch 嵌套,代码变成了回调地狱。

JDK8 引入的 CompletableFuture 解决了这些问题。它用回调链替代了阻塞等待,用函数式组合替代了嵌套回调。但与此同时,另一个问题浮出水面:异步任务在线程池的不同线程间跳转时,ThreadLocal 中的上下文数据丢失了。

这是两个方向相反但紧密相关的问题:CompletableFuture 向外扩展(异步编排),ThreadLocal 向内收缩(线程隔离)。理解它们的关键,是理解任务如何在不同线程间流转,以及数据如何绑定到线程生命周期

核心概念

FutureTask 的局限性

FutureTask 是 JDK5 引入的异步任务包装器,但它有两个根本缺陷:

  1. get() 阻塞:获取结果时必须等待任务完成,无法注册回调
  2. 无法组合:多个 Future 之间没有组合操作,无法表达"A 完成后执行 B"或"A 和 B 都完成后执行 C"
java 复制代码
// FutureTask 的阻塞模式
Future<String> future = executor.submit(() -> fetchUser(userId));
String user = future.get();  // 阻塞!主线程必须等

// 多个 Future 的组合噩梦
Future<String> f1 = executor.submit(() -> serviceA());
Future<String> f2 = executor.submit(() -> serviceB());
// 想等两个都完成后执行 C?自己写轮询或 CountDownLatch
graph TD subgraph "FutureTask 阻塞模式" M[主线程] -->|submit| W1[Worker 线程1] M -->|submit| W2[Worker 线程2] M -->|get 阻塞等待| R[结果] W1 -->|完成后唤醒| M W2 -->|完成后唤醒| M end style M fill:#f9f,stroke:#333,stroke-width:2px

读图导引:粉色节点是主线程——在 get() 调用后进入阻塞状态,等待 Worker 线程完成后被唤醒。这种"提交后等待"的模式浪费了主线程的执行能力。

CompletableFuture 的 Completion 链表

CompletableFuture 的核心设计是不阻塞等待,而是注册回调。每个 CompletableFuture 内部维护一个 Completion 链表,当自身完成时,依次触发链表上的回调:

java 复制代码
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    volatile Object result;       // 结果或异常(AltResult 包装)
    volatile Completion stack;    // 等待触发的回调栈
}
graph LR subgraph "CompletableFuture Completion 链表" CF[CF1 完成时] -->|触发| C1[thenApply 转换] C1 -->|触发| C2[thenAccept 消费] C2 -->|触发| C3[thenRun 副作用] CF2[CF2 完成时] -->|触发| C4[thenCombine 合并] C4 -->|等待| CF1 end style CF fill:#9f9,stroke:#333,stroke-width:2px style CF2 fill:#bbf,stroke:#333,stroke-width:2px

读图导引:绿色节点是已完成的 CompletableFuture,它会触发自己的 Completion 链表。蓝色节点是另一个 CompletableFuture,两个 CompletableFuture 可以通过 thenCombine 等操作组合成依赖图。

串行操作

java 复制代码
CompletableFuture.supplyAsync(() -> fetchOrder(orderId))
    .thenApply(order -> calculatePrice(order))      // 转换结果
    .thenAccept(price -> logPrice(price))            // 消费结果
    .thenRun(() -> metrics.record("price_calc"));    // 无副作用的后续操作
方法 输入 输出 用途
thenApply T U 转换结果
thenAccept T void 消费结果
thenRun void void 纯副作用

并行组合

java 复制代码
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> serviceA());
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> serviceB());

// 两个都完成后合并
f1.thenCombine(f2, (a, b) -> a + b);

// 任意一个完成就使用
f1.applyToEither(f2, result -> result);

面试官视角:问 "CompletableFuture 和 Future 的区别" 要答出两个点:1) CompletableFuture 支持回调(不阻塞 get);2) CompletableFuture 支持链式组合(thenXxx 方法)。只答"异步"是不够的。

异常处理链

CompletableFuture 的异常处理比 FutureTask 优雅得多:

java 复制代码
CompletableFuture.supplyAsync(() -> {
    if (random.nextBoolean()) throw new RuntimeException("fail");
    return "success";
})
.exceptionally(ex -> {
    // 异常时返回默认值
    log.warn("Failed, using default", ex);
    return "default";
})
.thenApply(result -> result.toUpperCase());
graph TD subgraph "CompletableFuture 异常处理" S[supplyAsync] -->|成功| T[thenApply] S -->|异常| E[exceptionally] E -->|恢复为默认值| T end style E fill:#f9f,stroke:#333,stroke-width:2px

读图导引:粉色节点是异常恢复——当上游抛出异常时,exceptionally 捕获异常并返回替代值,下游的 thenApply 接收到的是替代值而不是异常。

三种异常处理方式的区别

java 复制代码
// exceptionally:异常时返回替代值,恢复正常链路
future.exceptionally(ex -> "default");

// handle:无论成功还是异常,都执行处理函数
future.handle((result, ex) -> {
    if (ex != null) return "fallback";
    return result;
});

// whenComplete:类似 handle 但不改变结果
future.whenComplete((result, ex) -> {
    if (ex != null) log.error("Failed", ex);
});

暗面CompletableFuture 的异常默认不会抛给调用方,而是被"吞掉"——如果没有注册 exceptionallyhandle,异常只会作为结果存储在 CompletableFuture 内部,永远不会被处理。这是生产环境中常见的内存泄漏源(异常对象及其堆栈被长期引用)。

ThreadLocal 的底层结构

ThreadLocal 是 Java 实现线程隔离的基础工具。每个线程拥有自己独立的变量副本,互不干扰。

java 复制代码
public class Thread implements Runnable {
    ThreadLocal.ThreadLocalMap threadLocals = null;          // 普通 ThreadLocal
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; // 可继承的 ThreadLocal
}
graph TD subgraph "ThreadLocal 结构" T1[Thread 1] -->|threadLocals| M1[ThreadLocalMap] T2[Thread 2] -->|threadLocals| M2[ThreadLocalMap] M1 -->|Entry[]| E1["ThreadLocal1 -> Value1"] M1 -->|Entry[]| E2["ThreadLocal2 -> Value2"] M2 -->|Entry[]| E3["ThreadLocal1 -> Value3"] end style M1 fill:#f9f,stroke:#333,stroke-width:2px style M2 fill:#bbf,stroke:#333,stroke-width:2px

读图导引:粉色节点是线程1的 ThreadLocalMap,蓝色节点是线程2的。每个 ThreadLocalMap 内部是 Entry 数组,key 是 ThreadLocal 对象(弱引用),value 是线程私有的值。两个线程使用同一个 ThreadLocal key,但对应不同的 value。

Entry 的特殊设计

java 复制代码
static class Entry extends WeakReference<ThreadLocal<?>> {
    Object value;  // value 是强引用!
    Entry(ThreadLocal<?> k, Object v) {
        super(k);   // key 是弱引用
        value = v;  // value 是强引用
    }
}

为什么 key 要设计成弱引用?

如果 key 是强引用,当 ThreadLocal 实例被置为 null(如方法返回后局部变量消失),但线程还在运行时,ThreadLocalMap 中的 Entry 仍然持有 ThreadLocal 的强引用,导致 ThreadLocal 无法被 GC。

用弱引用后,当外部没有强引用指向 ThreadLocal 时,GC 会回收 ThreadLocal 对象,ThreadLocalMap 中的 key 变成 null。但value 仍然是强引用——这就引出了内存泄漏问题。

原理分析

ThreadLocal 的内存泄漏与清理

graph TD subgraph "ThreadLocal 内存泄漏" T[Thread 长期存活] -->|强引用| M[ThreadLocalMap] M -->|Entry key=null| E[Entry] E -->|强引用| V[Value 无法回收] end style E fill:#f9f,stroke:#333,stroke-width:2px style V fill:#f9f,stroke:#333,stroke-width:2px

读图导引:粉色节点展示泄漏场景——Thread 长期存活(如线程池中的 Worker 线程),ThreadLocal 被 GC 后 key 变为 null,但 Entry 的 value 仍然是强引用,value 指向的对象无法被回收,形成内存泄漏。

ThreadLocalMap 的清理机制

ThreadLocalMap 提供了 expungeStaleEntry 方法来清理 key 为 null 的 Entry:

  1. set() 时探测清理:插入新元素时,扫描并清理一定数量的 stale entry
  2. get() 时延迟清理:读取时如果发现 key 为 null,清理该位置
  3. resize() 时全量清理:扩容前遍历整个表,清理所有 stale entry

但这些清理都是延迟的、非保证的。如果线程从不调用 get()set(),stale entry 永远不会被清理。

最佳实践

java 复制代码
// 使用完 ThreadLocal 后手动移除
private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = new ThreadLocal<>();

try {
    SimpleDateFormat sdf = DATE_FORMAT.get();
    if (sdf == null) {
        sdf = new SimpleDateFormat("yyyy-MM-dd");
        DATE_FORMAT.set(sdf);
    }
    return sdf.parse(dateStr);
} finally {
    // 或者用 try-with-resources 模式封装
    DATE_FORMAT.remove();  // 关键:使用完后移除!
}

面试官视角:问 "ThreadLocal 为什么会导致内存泄漏" 要答出完整的引用链:Thread → ThreadLocalMap → Entry → value(强引用)。Thread 长期存活(线程池)+ ThreadLocal 外部引用消失 = key 变 null + value 泄漏。

InheritableThreadLocal 的父子传递

InheritableThreadLocal 允许子线程继承父线程的 ThreadLocal 值:

java 复制代码
private static final InheritableThreadLocal<String> context = new InheritableThreadLocal<>();

context.set("parent-value");
new Thread(() -> {
    System.out.println(context.get());  // 输出 "parent-value"
}).start();

原理:创建子线程时,Thread 的构造函数会复制父线程的 inheritableThreadLocals

java 复制代码
// Thread 构造函数(简化)
Thread(parent) {
    if (parent.inheritableThreadLocals != null) {
        this.inheritableThreadLocals = 
            ThreadLocalMap.createInheritedMap(parent.inheritableThreadLocals);
    }
}
graph TD subgraph "InheritableThreadLocal" PT[父线程] -->|inheritableThreadLocals| PM[{"key"="value"}] PT -->|创建子线程| CT[子线程] CT -->|复制| CM[{"key"="value"}] end style PM fill:#9f9,stroke:#333,stroke-width:2px style CM fill:#bbf,stroke:#333,stroke-width:2px

读图导引:绿色节点是父线程的 inheritableThreadLocals。创建子线程时(蓝色节点),值被复制到子线程。注意是复制不是共享——子线程修改不会影响父线程。

线程池场景下的失效

java 复制代码
context.set("user-123");
executor.submit(() -> {
    System.out.println(context.get());  // 可能输出 "user-456" 或其他值!
});

线程池的 Worker 线程是复用的,它们可能是为其他请求创建的,inheritableThreadLocals 里残留了其他请求的上下文。InheritableThreadLocal 只在创建线程时复制父线程的值,而线程池的线程早已创建好了。

解决方案:TransmittableThreadLocal

阿里巴巴开源的 TransmittableThreadLocal(TTL)解决了线程池场景下的上下文传递:

java 复制代码
// 装饰 Runnable,在提交时捕获上下文,执行时还原
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
context.set("user-123");

// 用 TtlExecutors 装饰线程池
ExecutorService ttlExecutor = TtlExecutors.getTtlExecutor(executor);
ttlExecutor.submit(() -> {
    System.out.println(context.get());  // 输出 "user-123"
});

原理:装饰 Runnable/Callable,在提交任务时捕获当前线程的所有 TTL 值,在任务执行前还原到 Worker 线程,执行后清理

graph TD subgraph "TTL 上下文透传" T1[提交线程] -->|捕获 context| CAP[快照备份] CAP -->|装饰 Runnable| RUN[TtlRunnable] RUN -->|执行前还原| T2[Worker 线程] T2 -->|执行业务逻辑| T2 T2 -->|执行后清理| T2 end style CAP fill:#f9f,stroke:#333,stroke-width:2px style RUN fill:#bbf,stroke:#333,stroke-width:2px

读图导引:粉色节点是捕获操作——在提交时把当前线程的上下文备份。蓝色节点是装饰后的 Runnable——执行前把备份还原到 Worker 线程,执行后清理。这确保了每个任务看到正确的上下文,而不受线程复用的影响。

暗面:TTL 的"捕获-还原-清理"机制带来了性能开销——每次提交任务都要遍历所有 TTL 变量。对于高并发、低延迟的场景,需要评估这个开销是否可接受。

实战/源码

CompletableFuture 的异常处理最佳实践

java 复制代码
CompletableFuture.supplyAsync(() -> fetchData())
    .thenApply(data -> transform(data))
    .exceptionally(ex -> {
        // 记录异常,返回降级数据
        log.error("Data fetch failed", ex);
        return DEFAULT_DATA;
    })
    .whenComplete((result, ex) -> {
        // 无论成功失败,记录 metrics
        metrics.record("data_fetch", result != null);
    });

常见错误:忘记处理异常

java 复制代码
// 危险:异常被吞掉,永远不会被处理
CompletableFuture.runAsync(() -> {
    throw new RuntimeException("boom");
});
// 异常没有被捕获,也没有日志!

// 正确:必须注册异常处理
CompletableFuture.runAsync(() -> {
    throw new RuntimeException("boom");
}).exceptionally(ex -> {
    log.error("Task failed", ex);
    return null;
});

自定义线程池的异步链

java 复制代码
ExecutorService executor = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100));

CompletableFuture.supplyAsync(() -> serviceA(), executor)
    .thenCombine(
        CompletableFuture.supplyAsync(() -> serviceB(), executor),
        (a, b) -> merge(a, b)
    )
    .thenApplyAsync(result -> save(result), executor);  // 指定自定义线程池

重要:如果不指定线程池,thenXxx 方法默认使用 ForkJoinPool.commonPool()。如果异步链中有阻塞操作(如 IO),会耗尽 common pool,影响系统中其他使用 common pool 的代码(如 parallelStream)。

ThreadLocal 的正确使用模式

java 复制代码
// 模式1:用静态 ThreadLocal + 手动 remove
public class DateUtil {
    private static final ThreadLocal<SimpleDateFormat> TL = new ThreadLocal<>();
    
    public static Date parse(String str) throws ParseException {
        SimpleDateFormat sdf = TL.get();
        if (sdf == null) {
            sdf = new SimpleDateFormat("yyyy-MM-dd");
            TL.set(sdf);
        }
        try {
            return sdf.parse(str);
        } finally {
            // 如果是线程池场景,必须 remove!
            TL.remove();
        }
    }
}

// 模式2:Java8 的 DateTimeFormatter 是线程安全的,根本不需要 ThreadLocal
public static final DateTimeFormatter FORMATTER = 
    DateTimeFormatter.ofPattern("yyyy-MM-dd");
// FORMATTER.parse(str) 可以直接多线程使用

SimpleDateFormat 不是线程安全的,必须用 ThreadLocal 或每次新建。但 DateTimeFormatter(Java8)是线程安全的,是更好的选择。

线程池上下文透传的完整方案

java 复制代码
// 1. 定义上下文
public class RequestContext {
    private static final TransmittableThreadLocal<String> USER_ID = 
        new TransmittableThreadLocal<>();
    
    public static void setUserId(String userId) { USER_ID.set(userId); }
    public static String getUserId() { return USER_ID.get(); }
    public static void clear() { USER_ID.remove(); }
}

// 2. 装饰线程池
ExecutorService rawExecutor = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100));
ExecutorService ttlExecutor = TtlExecutors.getTtlExecutor(rawExecutor);

// 3. 使用
RequestContext.setUserId("user-123");
ttlExecutor.submit(() -> {
    // Worker 线程中也能拿到 userId
    log.info("Processing for user: {}", RequestContext.getUserId());
});

常见问题

Q1:CompletableFuture 默认用什么线程池?

CompletableFuture.supplyAsync()runAsync() 默认使用 ForkJoinPool.commonPool()。带 Executor 参数的重载方法可以指定自定义线程池。强烈建议生产环境中显式指定线程池,避免阻塞 common pool。

Q2:CompletableFuture 的 thenApply 和 thenApplyAsync 有什么区别?

  • thenApply:在完成当前任务的线程中执行回调。如果当前任务是 common pool 的线程执行,回调也在 common pool 中执行。
  • thenApplyAsync:在异步线程池中执行回调,默认 common pool。
java 复制代码
cf.thenApply(x -> x + 1);        // 同线程执行
cf.thenApplyAsync(x -> x + 1);   // 提交到线程池异步执行

Q3:ThreadLocal 的 key 为什么是弱引用?

如果 key 是强引用,当 ThreadLocal 实例的外部引用消失后(如方法返回),ThreadLocalMap 仍然持有 ThreadLocal 的强引用,导致 ThreadLocal 无法被 GC。弱引用允许 GC 在适当时候回收 ThreadLocal,虽然 value 可能泄漏,但至少 key 能被回收,为后续的 expungeStaleEntry 清理创造条件。

Q4:InheritableThreadLocal 在线程池中为什么失效?

因为线程池的 Worker 线程是复用的,InheritableThreadLocal 只在创建线程时复制父线程的值。当线程池中的线程执行新任务时,它的 inheritableThreadLocals 还是之前任务留下的值,不会自动更新为当前提交线程的值。

Q5:CompletableFuture 的组合操作异常怎么处理?

java 复制代码
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("B fail"); });

// thenCombine 中任一异常,组合失败
f1.thenCombine(f2, (a, b) -> a + b)
  .exceptionally(ex -> "fallback");  // 捕获组合中的异常

// allOf:所有完成(无论成功失败)
CompletableFuture.allOf(f1, f2).join();

// anyOf:任意一个完成
CompletableFuture.anyOf(f1, f2).join();

总结

CompletableFuture 与 ThreadLocal 代表了并发编程的两个维度:向外扩展与向内收缩。

  • CompletableFuture 的回调链:用 Completion 链表替代阻塞等待,用 thenXxx 方法实现串行转换/消费/副作用,用 thenCombine/applyToEither 实现并行组合。异常通过 exceptionally/handle/whenComplete 链式处理,但默认会被"吞掉",必须显式注册处理
  • 默认线程池的陷阱:CompletableFuture 默认使用 ForkJoinPool.commonPool(),阻塞操作会耗尽 common pool。生产环境应显式指定自定义线程池
  • ThreadLocal 的线程隔离:每个 Thread 持有 ThreadLocalMap,key 是弱引用的 ThreadLocal,value 是强引用的线程私有数据。key 的弱引用设计是为了让 ThreadLocal 能被 GC,但 value 的强引用导致内存泄漏风险——线程池场景下必须 remove()
  • InheritableThreadLocal 的局限:只在创建线程时复制父线程值,在线程池复用场景下失效。TransmittableThreadLocal 通过"提交时捕获、执行前还原、执行后清理"解决线程池上下文透传

理解这两个机制的关键,是追问:任务在不同线程间流转时,数据和状态是如何传递的? CompletableFuture 通过 Completion 链表让回调在不同线程间接力,ThreadLocal 通过线程私有的 Map 让每个线程拥有独立的数据副本。两者配合使用时,要特别注意异步链中的上下文丢失问题。