Java

线程池

勤劳的小蜜蜂 · 1月17日 · 2020年 ·

线程池的自我介绍

为什么要使用线程池

  • 反复创建线程开销大
  • 过多的线程会占用太多内存

线程池的好处

  • 加快响应速度
  • 合理利用 cpu 和内存
  • 统一管理资源

线程池适合应用的场合

  • 服务器接收到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率
  • 实际上,在开发中,如果需要创建 5 个以上的线程,那么就可以使用线程池来管理

创建和停止线程池

线程池构造函数的参数

参数中的 corePoolSize 和 maxPoolSize

maximumPoolSize 的说明

我们可以将 maximumPoolSize 和 maxPoolSize 认识是相同的,因为在 ThreadPoolExecutor 类的参数中,变量名是 maximumPoolSize;不过在 org.springframework.sheduling.concurrent包的 ThreadPoolExecutorFactoryBean 类等其他类中,也有使用 maxPoolSize 作为参数名的情况。

  • corePoolSize:线程池维护线程的最少数量
  • maxPoolSize:线程池维护线程的最大数量

线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务

添加线程规则

  1. 如果线程数小于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务
  2. 如果线程数等于(或大于)corePoolSize 但少于 maximumPoolSize,则将任务放入队列
  3. 如果队列已满,并且线程数小于 maxPoolSize,则创建一个新线程来运行任务
  4. 如果队列已经满,并且线程数大于或等于 maxPoolSize,则拒绝该任务 

是否需要增加线程的判断顺序是

  1. corePoolSize
  2. workQueue
  3. maxPoolSize
增减线程的特点
  1. 通过设置 corePoolSize 和 maximumPoolSize 相同,就可以创建固定大小的线程池
  2. 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它
  3. 通过设置 maximumPoolSize 为很高的值,例如 Integer.MXA_VALUE,可以允许线程池容纳任意数量的并发任务
  4. 是只有在队列填满时才创建多于 corePoolSize 的线程,所以如果你使用的是无界队列(例如 LinkedBlockingQueue),那么线程数就不会超过 corePoolSize

keepAliveTime

如果线程池当前的线程数多于 corePoolSize,那么如果多余的线程空闲时间超过 keepAliveTime,它们就会被终止。

ThreadFactory 用来创建线程

新的线程是由 ThreadFactory 创建的,默认使用 Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的 NORM_PRIORITY 优先级并且都不是守护线程。如果自己指定 ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。

workQueue

有3 种最常见的队列类型:

  1. 直接交换:SynchronousQueue(常用于:任务通过队列中转交给线程,队列没有容量,存不下任务)
  2. 无界队列:LinkedBlockingQueue(不会被塞满,maximumPoolSize 参数变得没有意义)
  3. 有界队列:ArrayBlockingQueue

线程池应该手动创建还是自动创建

手动创建更好,因为这样可以让我们更明确线程池的运行规则,避免资源耗尽的风险

根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,我们想给线程取什么名字等

自动创建线程池(也就是直接调用 JDK 封装好的构造函数)可能带来哪些问题

  • newFixedThreadPool
    • 由于传进去的 LinkedBlockingQueue 是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能会导致 OOM
  • newSingleThreadExecutor
    • 和newFixedThreadPool 的原理基本一样,只不过把线程数直接设置成了 1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存
  • newCachedThreadPool
    • 可缓存线程池
    • 特点:无界线程池,具有自动回收多于线程的功能
    • 弊端:第二个参数 maximumPoolSize 被设置为 Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致 OOM

线程池里的线程数量设定为多少比较合适?

  • CPU 密集型(加密、计算 hash 等):最佳线程数为 CPU 核心数的 1-2 倍左右
  • 耗时 IO 型(读取数据库、文件、网络读写等):最佳线程数一般会大于 cpu 核心数很多倍,以 JVM 线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考 Brain Goetz 推荐计算方法
  • 线程数 = CPU 核心数*(1 + 平均等待时间/平均工作时间)

停止线程池的正确方法

  1. shutdown(当前任务不会结束,但是新的任务进不来)
    • isShutdown:线程池是否停止
    • isTerminated:线程是否完全终止(包括正在执行的任务、队列中的任务)
    • awaitTermination:等待一段时间之内,检测线程是否终止
  2. shutdownNow

常见线程池的特点和用法

  • FixedThreadPool
    • 固定线程数
  • CachedThreadPool
    • 可缓存线程池
    • 具有自动回收多余线程的功能
  • ScheduleThreadPool
    • 支持定期/周期执行任务
  • SingleThreadExecutor
    • 单线程的线程池:它只会用唯一的工作线程来执行任务 
  • workStealingPool(JDK1.8加入的)
    • 这个线程池和之前的都有很大不同
    • 子任务(树的遍历、矩阵)
    • 窃取(假设共有三个线程同时执行, A, B, C,当A,B线程池尚未处理任务结束,而C已经处理完毕,则C线程会从A或者B中窃取任务执行,这就叫工作窃取)

任务太多,怎么拒绝?

4 种拒绝策略

  • AbortPolicy(抛出异常)
  • DiscardPolicy(不会得到通知)
  • DiscardOldestpolicy(丢掉最老的)
  • CallerRunsPolicy(让提交任务的线程去执行)

钩子方法,给线程池加点料

作用

  • 每个任务执行前后
  • 日志、统计 代码实例
/**
 * @Author: ye
 * @Date: 2020-01-17 12:57
 * @Description: 演示每个任务执行前后都可以放钩子函数
 */
public class PauseableThreadPool extends ThreadPoolExecutor {

    // 线程池状态
    private boolean isPaused;

    // 上锁
    private final ReentrantLock lock = new ReentrantLock();

    // Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效.
    private Condition unpaused = lock.newCondition();

    // 实现父类的方法
    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    // 钩子方法
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();

        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    // 线程池暂停
    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    // 线程池重新开始
    private void resume() {
        lock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被执行");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }
        Thread.sleep(1500);
        pauseableThreadPool.pause();
        System.out.println("线程池被暂停了");
        Thread.sleep(1500);
        pauseableThreadPool.resume();
        System.out.println("线程池被恢复了");
    }
}
0 条回应