余瑜的博客 余瑜的博客
首页
  • 并发
  • 线程池
  • spring
  • maven
  • 其他
  • redis
  • mysql
  • linux
  • zookeeper
  • docker
  • terminal
  • kong插件开发
  • 资料
  • leetCode-简单
  • blog
  • 其他
关于
GitHub (opens new window)
首页
  • 并发
  • 线程池
  • spring
  • maven
  • 其他
  • redis
  • mysql
  • linux
  • zookeeper
  • docker
  • terminal
  • kong插件开发
  • 资料
  • leetCode-简单
  • blog
  • 其他
关于
GitHub (opens new window)
  • 并发

  • 线程池

    • 浅谈JAVA线程池工作原理
    • 浅谈JAVA线程返回值工作原理
    • FutureTask
    • ThreadPoolExecutor原理
    • 线程池顶级类及其接口
      • 集成关系
      • Executor接口
      • ExecutorService接口
      • AbstractExecutorService类
    • ThreadPoolExecutor源码
    • ExecutorCompletionService源码
  • spring

  • maven

  • 其他

  • JAVA
  • 线程池
余瑜
2020-02-27
目录

线程池顶级类及其接口

转载自《线程池系列三》-线程池顶级类及其接口源码分析 (opens new window)

在了解ThreadPoolExecutor之前,我们必须先弄清楚其父类及其父接口的定义,了解这部分知识后,我们也可以自定义自己的执行器实现。

# 集成关系

首先看一下ThreadPoolExecutor的集成关系,如下所示:

public interface Executor 
public interface ExecutorService extends Executor
public abstract class AbstractExecutorService implements ExecutorService 
public class ThreadPoolExecutor extends AbstractExecutorService 
1
2
3
4

# Executor接口

该接口表示可执行的接口,只提供了一个方法execute(Runnable) 需要一个Runnable对象的参数,代码如下:

void execute(Runnable command);
1

# ExecutorService接口

该接口继承了Executor接口,并添加了执行器(这里指的是实现ExecutorService的类,不一定是线程池)相关的操作,比如关闭任务、提交任务、等待执行器关闭、执行器状态的判断等操作。

  • 提交任务操作
    submit(),提供了三种提交任务的方式,不管是哪一种提交操作,都是将传入的参数转换为RunnableFuture对象
  1. callable对象 在线程池中就是封装为FutureTask
  2. runnable对象 调用RunnableFuture的get()方法时,返回值为null
  3. runnable对象 + 泛型返回值T 调用RunnableFuture的get()方法时,返回值为T的值

其源码如下:

<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
1
2
3
  • 执行器的关闭操作
  1. shutdown() 关闭操作
  2. shutdownNow() 关闭操作,与shutdown()的返回值不同,其他的区别在讲解源码时进行分析
  3. awaitTermination() 等待执行器关闭,等待最长时间为参数时间

其源码如下:

void shutdown();
List<Runnable> shutdownNow();  
boolean awaitTermination(long timeout, TimeUnit unit)  throws InterruptedException;
1
2
3
  • 判断执行器状态操作
  1. isShutdown() 判断状态是否为关闭装填
  2. isTerminated() 判断执行器是否已经完全终止

其源码如下:

boolean isShutdown();
boolean isTerminated();
1
2
  • 批量执行任务操作
    与submit操作不同,该类型的任务是需要等等待任务执行完才会返回
  1. invokeAll(Collection) 等待提交的所有的任务都执行完,或者可以指定最长等待时间
  2. invokeAny(Collection) 执行完一个任务就可以返回,也可以制定最长的等待时间

这两种方法的实现是在AbstractExecutorService类中实现的,两者的具体含义在讲解源码时进行分析,其方法定义源码如下:

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
          throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,  long timeout, TimeUnit unit)         
         throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) 
         throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 
        throws InterruptedException, ExecutionException, TimeoutException;
1
2
3
4
5
6
7
8

# AbstractExecutorService类

该类重要实现了ExecutorService中的一些公用方法,例如提交操作、批量执行任务操作,除此之外,还提供了讲Callable对象和Runnable对象转化为RunnableFuture的操作。

  • 任务转化操作, 方法为protected方法,只能子类可以调用
    都是调用的FutureTask的构造方法,可以看出该类中返回的RunnableFuture类型都是FutureTask类型
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
1
2
3
4
5
6
  • 提交任务操作submit
    这里只看一个方法就可以,思想就是将Runnable+T返回值,或者Callable对象转化为FutureTask类型,然后调用execute(Runnable)方法(FutureTask继承了Runnable接口,也是Runnable对象),返回FutureTask对象。其源代码如下:
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
1
2
3
4
5
6
  • 批量处理任务操作

(1) invokeAny方法
功能:提交多个任务,只有一个任务正常执行完成(异常不算正常完成)即可返回(但是任务的执行并不是一次性全部启动执行的)
阻塞方法invokeAny(Colletion)和非阻塞方法invokeAny(Colletion, timeout, unit)都调用doInvokeAny()方法
其执行逻辑如下:

  1. 先执行一个任务
  2. 检查任务有没有执行完成,如果执行完成,并且是正常执行完,则直接返回结果
  3. 如果任务没有执行完成,并且还有可执行的任务,那么再启动一个任务,再次检查有没有任务执行完,循环2,3步
  4. 如果没有可执行任务了,只能等待任务执行,调用take()阻塞方法

具体的流程参考源码,源码中有详细的注释,源码如下:

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                        boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    //检查tasks集合参数
    if (tasks == null)
        throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
        throw new IllegalArgumentException();
    //用于存放提交任务的返回结果值
    List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
    //定义ExecutorCompletionService,讲this作为其构造参数,因为要用该执行器执行任务
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);

    try {
        ExecutionException ee = null;
        //判断超时时间
        long lastTime = timed ? System.nanoTime() : 0;
        Iterator<? extends Callable<T>> it = tasks.iterator();

        // 提交第一个任务
        futures.add(ecs.submit(it.next()));
        //记录任务数
        --ntasks;
       //记录正在执行的任务数量
        int active = 1;

        for (;;) {
            //注意:这里使用的是poll()非阻塞方法(没有执行完的任务时,返回null),而不是take()
            Future<T> f = ecs.poll();
            //说明没有执行完成的任务
            if (f == null) {
                //如果还存在可执行的任务,就添加任务执行
                if (ntasks > 0) {
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                //所有的任务都不是正常结束的
                else if (active == 0)
                    break;
                //如果设定了超时,计算是否超时,如果已超时,抛出超时异常
                else if (timed) {
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
                //所有任务都提交了,等待有执行完成的任务
                else
                    f = ecs.take();
            }
            // 存在已经执行完成的任务
            if (f != null) {
                --active;
                try {
                    //返回执行任务结果
                    return f.get();
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }
        //执行到这说明,没有通过return返回结果,也就是出现了问题,抛出异常
        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {
        //取消所有的任务
        for (Future<T> f : futures)
            f.cancel(true);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

从该方法中可以看出,任务并不是一次性启动的,然是先启动一个,判断有没有结束,如果没有结束那么再启动一个任务,如果已经结束则返回
再看invokeAny(Collection<? extends Callable<T>> tasks)阻塞方法,其源码如下,只是简单的调用了doInvokeAny()方法,设置不超时:

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}
1
2
3
4
5
6
7
8
9

那么invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)非阻塞方法,也是简单调用了doInvokeAny()方法而已,其源码如下:

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
1
2
3
4
5

(2) invokeAll方法
功能:执行完所有的任务才会返回
其执行逻辑:

  1. 执行所有的任务
  2. 等待所有任务执行完

详细的注释在源码中,源码如下:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    //参数检查
    if (tasks == null)
        throw new NullPointerException();
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
         //一次性启动并执行所有的任务
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
       //判断任务有没有执行完,如果没有执行完,则等待
        for (Future<T> f : futures) {
            if (!f.isDone()) {
                try {
                    //get()方法为阻塞方法,只有执行完成才会返回
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        //表明所有的任务都已经执行完成,但是不能保证是否正常执行完
        done = true;
        return futures;
    } finally {
        //如果是异常终止,那么取消所有的任务
        if (!done)
            for (Future<T> f : futures)
                f.cancel(true);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

该方法不同于invokeAny, 该方法是一次性启动执行所有的任务,而且不管任务是否执行成功,执行完成即可
invokeAll(Collection<? extends Callable<T>> task, long timeout, TimeUnit unit)方法只是简单的条件了判断超时的逻辑,在调用get方法时,不在是调用阻塞get(),而是调用可超时的get(timeout)方法,其源码如下:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    if (tasks == null || unit == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        //先将任务都放到list中,返回的list不管超时与否都是需要返回的
        //所以先把任务都加到list中
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        long lastTime = System.nanoTime();

        Iterator<Future<T>> it = futures.iterator();
        //在启动执行过程中,也判断是否超时
        while (it.hasNext()) {
            execute((Runnable)(it.next()));
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0)
                return futures;
        }

        for (Future<T> f : futures) {
            if (!f.isDone()) {
                if (nanos <= 0)
                    return futures;
                try {
                    //每次都计算超时时间,调用非阻塞的get()方法
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                }
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (Future<T> f : futures)
                f.cancel(true);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

至此,ThreadPoolExecutor其父类和接口的源码已经分析完了,而线程池的源码分析才刚刚开始,还需要耐心学习下去。

上次更新: 2021/02/20, 20:09:17

← ThreadPoolExecutor原理 ThreadPoolExecutor源码→

Theme by Vdoing | Copyright © 2018-2022 逆光世间 | 备案号: 京ICP备19016086号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式