原创

关于线程池ThreadPoolExecutor中多余的线程是如何回收的理解

温馨提示:
本文最后更新于 2021年08月06日,已超过 333 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

这里涉及到线程池的执行策略,建议先看一下线程池ThreadPoolExecutor框架

我在网上看到的原话是这样的:线程池中多余的线程是如何回收的?

听说这是一道面试题,我当时就惊了,这不就是相当于问源码吗???好家伙,现在ThreadPoolExecutor框架都问的这么深了吗?

莫慌。

下面一起来简单分析ThreadPoolExecutor回收工作线程的原理。

Step1

来个例子:

public class ThreadPoolALL {
    public static void main(String[] args) {
        new ThreadPoolALL().ExecutorThreadPool();
    }

    static class TestRunnable implements Runnable {
        static int i = 0;
        @Override
        public void run() {
            synchronized (this) {
                int count = getCount();
                System.out.println(Thread.currentThread().getName() + "  线程被调用了。第" + count + "次");
            }
        }

        public static int getCount() {
            return ++i;
        }
    }

    /**
     * ThreadPoolExecutor是ExecutorSerivce接口的具体实现
     * 即ExecutorSerivce最后也是调用ThreadPoolExecutor的
     * ThreadPoolExecutor提供了很多参数
     * 阿里开发手册建议使用这种方法创建线程池
     */
    public void ExecutorThreadPool() {
        //自定义线程池
        int corePoolSize = 2; //线程池维护线程的最少数量
        int maxPoolSize = 3; //线程池维护线程的最大数量
        long keepAliveTime = 10; //线程池维护线程所允许的空闲时间(解释:当线程池的数量超过corePoolSize时,多余的空闲线程的存活时间。)
        RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
        BlockingDeque<Runnable> queue = new LinkedBlockingDeque<>(7);
        //或者
        ThreadFactory factory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                SecurityManager s = System.getSecurityManager();
                ThreadGroup group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
                Thread t = new Thread(group, r);
                //设置优先级
                if (t.getPriority() != Thread.NORM_PRIORITY) {
                    t.setPriority(Thread.NORM_PRIORITY);
                }
                //设置错误处理
                t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                    @Override
                    public void uncaughtException(Thread t, Throwable e) {
                        //自定义处理错误
                        System.out.println("factory捕获了错误--->>>" + t.getName() + e);
                    }
                });
                return t;
            }
        };

        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue, factory, rejectedExecutionHandler);
//这是不使用shutdown回收线程的替代方法。
// 在allowCoreThreadTimeOut设置为true时,ThreadPoolExecutor的keepAliveTime参数必须大于0。
        executor.allowCoreThreadTimeOut(false);

        //1、Runnable方法起线程
        TestRunnable testRunnable = new TestRunnable();
        //起多少个线程
        for (int i = 0; i < 10; i++) {
            executor.execute(testRunnable);
        }
//        executor.shutdown();
    }
}

:warning:注意:这里是注释了executor.shutdown() 而且 executor.allowCoreThreadTimeOut(false) 设置为false (默认是false)

输出:

Thread-0  线程被调用了。第1次
Thread-0  线程被调用了。第2次
Thread-0  线程被调用了。第3次
Thread-0  线程被调用了。第4次
Thread-1  线程被调用了。第5次
Thread-1  线程被调用了。第6次
Thread-1  线程被调用了。第7次
Thread-1  线程被调用了。第8次
Thread-2  线程被调用了。第9次
Thread-0  线程被调用了。第10次

但是你看到控制台是没有退出的,IDEA 执行完了之后也是没有退出的:

Step2

我们用java命令工具看一下有哪些进程:

C:\Users\HaC>jps
12192
21992 KotlinCompileDaemon
3176 Launcher
10076 Launcher
20476 Jps
20780 ThreadPoolALL
7084

可以看到 20780 ThreadPoolALL这个java线程一直存活 ,

再查看一下这个进程有什么线程:(需要马上查看,因为设置了keepAliveTime)

C:\Users\HaC> jstack 20780
2021-03-23 14:38:03
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode):

"DestroyJavaVM" #17 prio=5 os_prio=0 tid=0x0000000002331000 nid=0x5188 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
"Thread-2" #16 prio=5 os_prio=0 tid=0x0000000020054000 nid=0x3ff8 waiting on condition [0x00000000219fe000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076bcee280> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:522)
        at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:684)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

"Thread-1" #15 prio=5 os_prio=0 tid=0x0000000020053800 nid=0x4e70 waiting on condition [0x00000000217df000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076bcee280> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:522)
        at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:684)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

"Thread-0" #14 prio=5 os_prio=0 tid=0x00000000200c8000 nid=0x5408 waiting on condition [0x000000002162e000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076bcee280> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:522)
        at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:684)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
.....................省略

可以看到有三个线程 Thread-1 、Thread-2、Thread-0 在 TIMED_WAITING 状态。

step3

我在代码中设置了keepAliveTime =10 秒,10秒过完后,再来看看这个线程是否还在:

C:\Users\HaC> jstack 20780
2021-03-23 14:40:17
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode):

"DestroyJavaVM" #17 prio=5 os_prio=0 tid=0x0000000002331000 nid=0x5188 waiting on condition [0x000
0000000000000]
   java.lang.Thread.State: RUNNABLE

"Thread-2" #16 prio=5 os_prio=0 tid=0x0000000020054000 nid=0x3ff8 waiting on condition [0x00000000
219fe000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076bcee280> (a java.util.concurrent.locks.AbstractQueuedSy
nchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQue
uedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
        at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

"Thread-0" #14 prio=5 os_prio=0 tid=0x00000000200c8000 nid=0x5408 waiting on condition [0x00000000
2162e000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076bcee280> (a java.util.concurrent.locks.AbstractQueuedSy
nchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQue
uedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
        at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

可以看到只有两个线程 Thread-2、Thread-0 在 TIMED_WAITING 状态了。

这也验证了:

如果你设置了keepAliveTime 且设置executor.allowCoreThreadTimeOut(false),当线程池的数量超过corePoolSize时,超过的部分,即多余的空闲线程的存活时间超过keepAliveTime就会被回收。

即maxPoolSize-corePoolSize的部分

如果你executor.allowCoreThreadTimeOut(true),再执行一下,等待10s后,发现线程自动退出了。

Process finished with exit code 0

说明executor.allowCoreThreadTimeOut(true) 会把corePoolSize的线程也退出了。

下面再来演示一下executor.shutdown();,我们放开注释,再设置executor.allowCoreThreadTimeOut(false)

可以看到控制台马上退出了,并不会等待10s这么长。


总结一下:

线程退出(回收)有两种方法:

1、参数allowCoreThreadTimeOut为true,等待keepAliveTime后,线程就会被回收。

2、加上executor.shutdown();马上将线程池中的空闲线程回收。该方法会使得keepAliveTime参数失效。


这里是拓展阅读:

线程池执行过程分析

续上以上的代码,线程池开始执行的入口是这里:

executor.execute(testRunnable);

顺藤摸瓜跳过去看一下这个execute方法:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

这里有一大段注释,我在网上找了一张图,意思大概就是:

文字版本:

  1. 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
  2. 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
  3. 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
  4. 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
  5. 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
  6. 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

既然程序已经执行完了,为什么线程还没退出呢?

因为线程池的引用被它的内部类 Worker 持有了。而 Worker 和线程一一对应,是对 Thread 的增强,所以本质上就是因为线程没有被释放。

这里不详细展开线程池的执行原理,需要的可以参考:https://www.jianshu.com/p/edab547f2710

回收线程的核心在runWorker()方法的getTask()processWorkerExit()

final void runWorker(Worker w) {
    boolean completedAbruptly = true;
    ...
    try {
        while (getTask()...) {
            ...
            处理任务
        }
        //该线程已经从队列中取不到任务了,改变标记,该标记表示:该线程是否因用户因素导致的异常而终止
         completedAbruptly = false;
    } finally {
        //线程移除
        processWorkerExit(w, completedAbruptly);
    }
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

精力有限,简单顺着代码分析,最后线程池在执行 shutdown 方法 或 allowCoreThreadTimeOut方法 时,会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;

详细的可以参考:

https://blog.csdn.net/weixin_39633954/article/details/111579295.

https://www.jianshu.com/p/edab547f2710


参考这里的总结:https://www.cnblogs.com/kingsleylam/p/11241625.html

4. 总结

ThreadPoolExecutor回收工作线程,一条线程getTask()返回null,就会被回收。

分两种场景。

4.1 未调用shutdown() ,RUNNING状态下全部任务执行完成的场景

线程数量大于corePoolSize,线程超时阻塞,超时唤醒后CAS减少工作线程数,如果CAS成功,返回null,线程回收。否则进入下一次循环。当工作者线程数量小于等于corePoolSize,就可以一直阻塞了。

4.2 调用shutdown() ,全部任务执行完成的场景

shutdown() 会向所有线程发出中断信号,这时有两种可能。

4.2.1 所有线程都在阻塞

中断唤醒,进入循环,都符合第一个if判断条件,都返回null,所有线程回收。

4.2.2 任务还没有完全执行完

至少会有一条线程被回收。在processWorkerExit(Worker w, boolean completedAbruptly)方法里会调用tryTerminate(),向任意空闲线程发出中断信号。所有被阻塞的线程,最终都会被一个个唤醒,回收。

正文到此结束
关注公众号 【HelloCoder】
免费领取Java学习资料
让技术,化繁为简
本文目录