一、线程池概述
在Java多线程开发中,线程池是提升程序性能、管理线程生命周期的核心组件。本文将深入剖析5种常用线程池实现,通过完整代码示例演示核心API的使用,并给出企业级应用的最佳实践方案。
二、线程池核心参数解析
在创建自定义线程池前,需要理解线程池的7大核心参数:
new ThreadPoolExecutor(
corePoolSize, // 核心线程数(常驻线程)
maximumPoolSize,// 最大线程数(应急线程)
keepAliveTime, // 空闲线程存活时间
unit, // 时间单位
workQueue, // 任务队列
threadFactory, // 线程工厂
handler // 拒绝策略
);
三、五种线程池实战
1. 单线程线程池(SingleThreadExecutor)
适用场景:需要保证任务顺序执行的场景
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> log.info("顺序任务1"));
executor.execute(() -> log.info("顺序任务2"));
执行流程:
核心线程数=最大线程数=1使用无界队列LinkedBlockingQueue所有任务顺序执行
潜在风险:队列无限增长可能导致内存溢出
2. 固定大小线程池(FixedThreadPool)
适用场景:已知并发量的稳定负载场景
ExecutorService executor = Executors.newFixedThreadPool(5);
IntStream.range(0,10).forEach(i ->
executor.submit(() -> processTask(i)));
线程分配策略:
核心线程数=最大线程数=N使用无界队列LinkedBlockingQueue适用于CPU密集型任务
3. 缓存线程池(CachedThreadPool)
适用场景:短期异步任务或低并发场景
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(() -> handleQuickTask());
特点:
核心线程数=0,最大线程数=Integer.MAX_VALUE空闲线程60秒自动回收使用SynchronousQueue(直接传递队列)
风险提示:可能引发线程爆炸(Thread Explosion)
4. 定时线程池(ScheduledThreadPool)
适用场景:定时/周期性任务调度
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
// 延迟执行
scheduler.sdule(() -> task(), 5, TimeUnit.SECONDS);
// 周期性执行
scheduler.scheduleAtFixedRate(() -> log(), 1, 10, TimeUnit.SECONDS);
核心特点:
使用DelayedWorkQueue实现定时支持固定速率/固定延迟两种模式
5. 自定义线程池(推荐方案)
企业级配置建议:
ThreadPoolExecutor customPool = new ThreadPoolExecutor(
5, // 根据CPU核心数设置
10, // 最大应急线程数
30, TimeUnit.SECONDS, // 空闲线程存活时间
new ArrayBlockingQueue<>(100), // 有界队列防止内存溢出
new CustomThreadFactory(), // 自定义线程命名
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
配置解析:
核心线程数 = CPU核心数 + 1队列容量 = 预期最大并发量 × 2推荐使用有界队列+CallerRunsPolicy组合
四、关键API对比
1. execute() vs submit()
方法返回值异常处理适用场景execute()void直接抛出简单任务submit()Future封装在Future中需要结果/异常处理示例代码:
// execute处理异常
executor.execute(() -> {
try {
riskyOperation();
} catch (Exception e) {
log.error("操作失败", e);
}
});
// submit获取异常
Future> future = executor.submit(() -> riskyOperation());
try {
future.get();
} catch (ExecutionException e) {
log.error("任务执行异常", e.getCause());
}
2. 拒绝策略对比
策略类行为描述AbortPolicy直接抛出RejectedExecutionExceptionCallerRunsPolicy由调用线程直接执行任务DiscardPolicy静默丢弃新任务DiscardOldestPolicy丢弃队列最旧任务后重试推荐策略:生产环境建议使用CallerRunsPolicy
五、线程池监控与管理
1. 状态监控方法
// 获取活跃线程数
pool.getActiveCount()
// 获取已完成任务数
pool.getCompletedTaskCount()
// 获取队列积压量
pool.getQueue().size()
2. 优雅关闭方案
void gracefulShutdown(ExecutorService pool) {
pool.shutdown(); // 停止接收新任务
try {
if (!pool.awaitTermination(60, SECONDS)) {
pool.shutdownNow(); // 取消等待任务
if (!pool.awaitTermination(60, SECONDS)) {
log.error("线程池未完全关闭");
}
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
六、最佳实践指南
线程数计算黄金公式:
CPU密集型:N+1(N为CPU核心数)IO密集型:2N+1
队列选择策略:
快速响应:SynchronousQueue(配合最大线程数)流量削峰:LinkedBlockingQueue优先级调度:PriorityBlockingQueue
异常处理原则:
在任务内部捕获所有Checked Exception通过Future.get()处理未捕获异常使用UncaughtExceptionHandler处理运行时异常
生产环境建议:
禁止使用Executors快捷方法推荐自定义线程池集成监控系统(如Micrometer)
七、完整代码示例
项目包含5种线程池实现:
createSingleThreadPool() 单线程池演示createFixedThreadPool() 固定线程池演示createCachedThreadPool() 缓存线程池演示createScheduledThreadPool() 定时任务池演示createCustomThreadPool() 自定义线程池实现
执行结果示例:
================ 自定义线程池任务执行 ================
[INFO] 线程:custom-thread-pool-1,办理业务
[INFO] 线程:custom-thread-pool-2,办理业务
[INFO] 业务办理完成
八、总结
合理使用线程池需要综合考虑以下因素:
任务类型:CPU密集型 vs IO密集型系统资源:内存、CPU核心数业务需求:响应时间、吞吐量要求容错机制:异常处理、降级策略
建议开发者在实际项目中结合APM工具进行线程池监控,根据运行时指标动态调整参数,以达到最优的系统性能。
完整代码
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* 线程池使用示例及实践
* execute:适用于无返回值的任务执行,如果需要返回值需要自己传入FutureTask,在子线程中抛出异常,在主线程捕捉不到
* submit:适用于有返回值的任务执行,不会直接抛出异常,会将异常捕获到FutureTask中,需要通过future.get()获取
*
* @author Devil
* @version 1.0
* @date 2025/4/12 21:42
*/
@Slf4j
public class ThreadPoolDemo {
/**
* 创建并演示使用单个线程的线程池
* 特点:
* 1. 保证所有任务顺序执行
* 2. 使用无界队列(LinkedBlockingQueue),需注意可能的内存溢出问题
* 3. 适用于需要保证任务顺序执行的场景
*/
@SneakyThrows
public static void createSingleThreadPool() {
// 创建单线程线程池(实际开发建议使用自定义ThreadPoolExecutor)
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
log.info("\n================ 普通任务执行 ================");
for (int i = 0; i < 10; i++) {
executorService.execute(() -> log.info("线程:{},办理业务", Thread.currentThread().getName()));
}
TimeUnit.MILLISECONDS.sleep(2);
log.info("\n================ 带返回值的任务执行 ================");
for (int i = 0; i < 10; i++) {
Future
log.info("线程:{},办理业务", Thread.currentThread().getName());
return "业务办理完成";
});
log.info(future.get());
}
} finally {
// 优雅关闭线程池
gracefulShutdown(executorService);
}
}
/**
* 创建固定大小的线程池
* 特点:
* 1. 固定核心线程数(=最大线程数)
* 2. 使用无界队列(LinkedBlockingQueue),需注意系统资源消耗
* 3. 适用于已知并发需求的稳定负载场景
*/
@SneakyThrows
public static void createFixedThreadPool() {
// 创建固定大小线程池(建议根据CPU核心数设置)
ExecutorService executorService = Executors.newFixedThreadPool(5);
try {
log.info("\n================ 普通任务执行 ================");
for (int i = 0; i < 10; i++) {
executorService.execute(() -> log.info("线程:{},办理业务", Thread.currentThread().getName()));
}
TimeUnit.MILLISECONDS.sleep(1);
log.info("\n================ 带返回值的任务执行 ================");
for (int i = 0; i < 10; i++) {
Future
log.info("线程:{},办理业务", Thread.currentThread().getName());
return "业务办理完成";
});
log.info(future.get());
}
} finally {
gracefulShutdown(executorService);
}
}
/**
* 创建可缓存线程池
* 特点:
* 1. 自动回收空闲线程(60秒)
* 2. 理论上可以创建Integer.MAX_VALUE个线程,需注意线程爆炸问题
* 3. 适用于短期异步任务或低负载场景
*/
@SneakyThrows
public static void createCachedThreadPool() {
// 创建弹性线程池(慎用,可能产生大量线程)
ExecutorService executorService = Executors.newCachedThreadPool();
try {
log.info("\n================ 普通任务执行 ================");
for (int i = 0; i < 10; i++) {
executorService.execute(() -> log.info("线程:{},办理业务", Thread.currentThread().getName()));
}
TimeUnit.MILLISECONDS.sleep(1);
log.info("\n================ 带返回值的任务执行 ================");
for (int i = 0; i < 10; i++) {
Future
log.info("线程:{},办理业务", Thread.currentThread().getName());
return "业务办理完成";
});
log.info(future.get());
}
} finally {
gracefulShutdown(executorService);
}
}
/**
* 创建定时任务线程池
* 特点:
* 1. 支持定时及周期性任务
* 2. 核心线程数固定,但可以不断创建新线程执行后续任务
* 3. 适用于需要定时执行或周期性执行的场景
*/
@SneakyThrows
public static void createScheduledThreadPool() {
// 创建定时任务线程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
try {
log.info("\n================ 延迟任务执行 ================");
for (int i = 0; i < 10; i++) {
scheduledExecutorService.schedule(() ->
log.info("线程:{},办理延迟业务", Thread.currentThread().getName()),
1, TimeUnit.SECONDS);
}
TimeUnit.SECONDS.sleep(2);
log.info("\n================ 带返回值的延迟任务执行 ================");
for (int i = 0; i < 10; i++) {
ScheduledFuture
log.info("线程:{},办理延迟业务", Thread.currentThread().getName());
return "延迟业务办理完成";
}, 1, TimeUnit.SECONDS);
log.info(future.get());
}
} finally {
gracefulShutdown(scheduledExecutorService);
}
}
/**
* 创建自定义线程池
* 特点:
* 1. 支持定时及周期性任务
* 2. 核心线程数固定,但可以不断创建新线程执行后续任务
* 3. 适用于需要定时执行或周期性执行的场景
*/
@SneakyThrows
public static void createCustomThreadPool() {
/*
创建自定义线程池
字段:
1. corePoolSize:核心线程池数量
2. maximumPoolSize: 最大线程池数量
3. keepAliveTime: 线程空闲时间
4. unit: 时间单位
5. workQueue: 阻塞队列
5. threadFactory: 线程工厂
5. handler: 拒绝策略
*/
ThreadPoolExecutor customthreadPoolExecutor = new ThreadPoolExecutor(
5, // 根据CPU核心数设置
10, // 最大应急线程数
30, TimeUnit.SECONDS, // 空闲线程存活时间
new ArrayBlockingQueue<>(100), // 有界队列防止内存溢出
new DefaultThreadFactory("custom-thread-pool"), // 自定义线程命名
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
try {
log.info("\n================ 普通任务执行 ================");
for (int i = 0; i < 20; i++) {
customthreadPoolExecutor.execute(() -> log.info("线程:{},办理业务", Thread.currentThread().getName()));
}
TimeUnit.MILLISECONDS.sleep(1);
log.info("\n================ 带返回值的任务执行 ================");
for (int i = 0; i < 10; i++) {
Future
log.info("线程:{},办理业务", Thread.currentThread().getName());
return "业务办理完成";
});
log.info(future.get());
}
} finally {
gracefulShutdown(customthreadPoolExecutor);
}
}
/**
* 优雅关闭线程池通用方法
*
* @param pool 需要关闭的线程池
*/
private static void gracefulShutdown(ExecutorService pool) {
pool.shutdown(); // 拒绝新任务提交
try {
// 等待现有任务完成
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 取消等待中的任务 只等待运行中的任务
// 再次等待任务响应中断
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
log.error("线程池未完全关闭");
}
}
} catch (InterruptedException e) {
// 重新尝试关闭
pool.shutdownNow();
Thread.currentThread().interrupt();
} finally {
log.info("线程池是否执行完成:{}", pool.isTerminated());
}
}
public static void main(String[] args) {
// 测试不同线程池(选择其中一个执行)
// createSingleThreadPool();
// createFixedThreadPool();
// createCachedThreadPool();
// createScheduledThreadPool();
createCustomThreadPool();
}
}