线程池的自我介绍
为什么要使用线程池
- 反复创建线程开销大
- 过多的线程会占用太多内存
线程池的好处
- 加快响应速度
- 合理利用 cpu 和内存
- 统一管理资源
线程池适合应用的场合
- 服务器接收到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率
- 实际上,在开发中,如果需要创建 5 个以上的线程,那么就可以使用线程池来管理
创建和停止线程池
线程池构造函数的参数

参数中的 corePoolSize 和 maxPoolSize
maximumPoolSize 的说明
我们可以将 maximumPoolSize 和 maxPoolSize 认识是相同的,因为在 ThreadPoolExecutor 类的参数中,变量名是 maximumPoolSize;不过在 org.springframework.sheduling.concurrent包的 ThreadPoolExecutorFactoryBean 类等其他类中,也有使用 maxPoolSize 作为参数名的情况。
- corePoolSize:线程池维护线程的最少数量
- maxPoolSize:线程池维护线程的最大数量
线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务
添加线程规则
- 如果线程数小于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务
- 如果线程数等于(或大于)corePoolSize 但少于 maximumPoolSize,则将任务放入队列
- 如果队列已满,并且线程数小于 maxPoolSize,则创建一个新线程来运行任务
- 如果队列已经满,并且线程数大于或等于 maxPoolSize,则拒绝该任务

是否需要增加线程的判断顺序是
- corePoolSize
- workQueue
- maxPoolSize
增减线程的特点
- 通过设置 corePoolSize 和 maximumPoolSize 相同,就可以创建固定大小的线程池
- 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它
- 通过设置 maximumPoolSize 为很高的值,例如 Integer.MXA_VALUE,可以允许线程池容纳任意数量的并发任务
- 是只有在队列填满时才创建多于 corePoolSize 的线程,所以如果你使用的是无界队列(例如 LinkedBlockingQueue),那么线程数就不会超过 corePoolSize
keepAliveTime
如果线程池当前的线程数多于 corePoolSize,那么如果多余的线程空闲时间超过 keepAliveTime,它们就会被终止。
ThreadFactory 用来创建线程
新的线程是由 ThreadFactory 创建的,默认使用 Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的 NORM_PRIORITY 优先级并且都不是守护线程。如果自己指定 ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。
workQueue
有3 种最常见的队列类型:
- 直接交换:SynchronousQueue(常用于:任务通过队列中转交给线程,队列没有容量,存不下任务)
- 无界队列:LinkedBlockingQueue(不会被塞满,maximumPoolSize 参数变得没有意义)
- 有界队列:ArrayBlockingQueue
线程池应该手动创建还是自动创建
手动创建更好,因为这样可以让我们更明确线程池的运行规则,避免资源耗尽的风险
根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,我们想给线程取什么名字等
自动创建线程池(也就是直接调用 JDK 封装好的构造函数)可能带来哪些问题
- newFixedThreadPool
- 由于传进去的 LinkedBlockingQueue 是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能会导致 OOM
- newSingleThreadExecutor
- 和newFixedThreadPool 的原理基本一样,只不过把线程数直接设置成了 1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存
- newCachedThreadPool
- 可缓存线程池
- 特点:无界线程池,具有自动回收多于线程的功能
- 弊端:第二个参数 maximumPoolSize 被设置为 Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致 OOM
线程池里的线程数量设定为多少比较合适?
- CPU 密集型(加密、计算 hash 等):最佳线程数为 CPU 核心数的 1-2 倍左右
- 耗时 IO 型(读取数据库、文件、网络读写等):最佳线程数一般会大于 cpu 核心数很多倍,以 JVM 线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考 Brain Goetz 推荐计算方法
- 线程数 = CPU 核心数*(1 + 平均等待时间/平均工作时间)
停止线程池的正确方法
- shutdown(当前任务不会结束,但是新的任务进不来)
- isShutdown:线程池是否停止
- isTerminated:线程是否完全终止(包括正在执行的任务、队列中的任务)
- awaitTermination:等待一段时间之内,检测线程是否终止
- shutdownNow
常见线程池的特点和用法
- FixedThreadPool
- 固定线程数
- CachedThreadPool
- 可缓存线程池
- 具有自动回收多余线程的功能
- ScheduleThreadPool
- 支持定期/周期执行任务
- SingleThreadExecutor
- 单线程的线程池:它只会用唯一的工作线程来执行任务
- workStealingPool(JDK1.8加入的)
- 这个线程池和之前的都有很大不同
- 子任务(树的遍历、矩阵)
- 窃取(假设共有三个线程同时执行, A, B, C,当A,B线程池尚未处理任务结束,而C已经处理完毕,则C线程会从A或者B中窃取任务执行,这就叫工作窃取)

任务太多,怎么拒绝?
4 种拒绝策略
- AbortPolicy(抛出异常)
- DiscardPolicy(不会得到通知)
- DiscardOldestpolicy(丢掉最老的)
- CallerRunsPolicy(让提交任务的线程去执行)
钩子方法,给线程池加点料
作用
- 每个任务执行前后
- 日志、统计 代码实例
/**
* @Author: ye
* @Date: 2020-01-17 12:57
* @Description: 演示每个任务执行前后都可以放钩子函数
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
// 线程池状态
private boolean isPaused;
// 上锁
private final ReentrantLock lock = new ReentrantLock();
// Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效.
private Condition unpaused = lock.newCondition();
// 实现父类的方法
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
// 钩子方法
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// 线程池暂停
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
// 线程池重新开始
private void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");
}
}