源代码解析

public class ThreadPoolExecutor extends AbstractExecutorService {
  // 记录当前线程池的状态(0|x=x),所以默认就RUNNING状态
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  // 32-3=29 -> 11100
  private static final int COUNT_BITS = Integer.SIZE - 3;
  // 00011111 11111111 11111111 11111111
  private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

  // 线程池的运行状态保存在高3位的bit中
 	// 11100000 00000000 00000000 00000000
  private static final int RUNNING    = -1 << COUNT_BITS;
  // 00000000 00000000 00000000 00000000
  private static final int SHUTDOWN   =  0 << COUNT_BITS;
  // 00100000 00000000 00000000 00000000
  private static final int STOP       =  1 << COUNT_BITS;
  // 01000000 00000000 00000000 00000000
  private static final int TIDYING    =  2 << COUNT_BITS;
  // 01100000 00000000 00000000 00000000
  private static final int TERMINATED =  3 << COUNT_BITS;
  
  // 将COUNT_MASK取反,为11100000 00000000 00000000 00000000,
  // 然后(c&11100000 00000000 00000000 0000000),就取出了c中的前三位,后面的位都为0,即线程池的运行状态
  private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
  // 计算worker总数(c & 00011111 11111111 11111111 11111111),高3位都为0,后面的位都是c上的值
  // AtomicInteger ctl的后29位保存的是当前的worker数量
  private static int workerCountOf(int c)  { return c & COUNT_MASK; }
  // 计算当前的线程池状态
  private static int ctlOf(int rs, int wc) { return rs | wc; }
  
  // 通过CAS自增worker的数量
  private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
  }
  // 通过CAS递减worker的数量
  private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
  }
  
  // 阻塞队列,用于保存来不及处理的任务
  private final BlockingQueue<Runnable> workQueue;
  
  // 主锁
  private final ReentrantLock mainLock = new ReentrantLock();
  
  // 包含池中所有工作线程的集合。仅在持有主锁时访问。
  private final HashSet<Worker> workers = new HashSet<>();
  
  // 等待条件,以支持等待终止。
  private final Condition termination = mainLock.newCondition();
  
  // 跟踪获得的最大池大小。仅在主锁下访问。
  private int largestPoolSize;
  
  // 已完成任务的计数器。仅在工作线程终止时更新。仅在主锁下访问。
  private long completedTaskCount;
  
  // =======接下来是用户控制的参数,都使用volatile进行生命,可以保证在不进行加锁的情况下获取到最新值=======
  // 所有线程都是使用这个工厂创建的(通过addWorker方法)
  private volatile ThreadFactory threadFactory;
  private volatile RejectedExecutionHandler handler;
  private volatile long keepAliveTime;
  private volatile boolean allowCoreThreadTimeOut;
  private volatile int corePoolSize;
  private volatile int maximumPoolSize;
  // =========================================================
  
  // 创建了默认的拒绝策略
  private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
  
  // Worker,内部类,继承了AQS,具体代码看下文
  private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  	...
  }
  
  // 构造函数
  public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
  }
  
  // 构造函数(可以传入自定义的拒绝策略)
  public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
  }
  
  // 提交任务
  public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
  }
  
  // 执行任务
  public void execute(Runnable command) {
    if (command == null)
      throw new NullPointerException();
    int c = ctl.get();
    // 如果线程数小于核心线程数,则创建线程来执行任务
    if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true))
        return;
      c = ctl.get();
    }
    // 如果线程数大于核心线程数,就将任务放在任务队列中
    if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
      // 一般不会执行以下内容
      // 再次检查是否应该添加一个线程(因为现有的线程在上次检查后死亡)
      // 如果被线程池被停止了,则回滚队列;如果没有线程,则启动一个新线程(非核心线程)
      if (! isRunning(recheck) && remove(command))
        reject(command);
      else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
    }
    // 如果线程数大于核心线程数 && 任务队列已满,则执行拒绝策略
    else if (!addWorker(command, false))
      reject(command);
  }
  
  // 以下是线程实际运行的代码
  final void runWorker(Worker w) {
    Thread wt = Thread.currentThread(); // 获取当前线程
    Runnable task = w.firstTask; // 拿到第一个运行任务
    w.firstTask = null; // 将执行第一个任务的引用置空
    w.unlock(); // 将status设置为0,允许被中断
    boolean completedAbruptly = true; // 突然完成的标识位
    try {
      // 任务不为null || 获取下一个的任务不为null
      // 符合其中一项就继续循环,即直到队列当中的任务为空时,才退出循环
      while (task != null || (task = getTask()) != null) {
        w.lock(); // 加锁
        // If pool is stopping, ensure thread is interrupted;
        // if not, ensure thread is not interrupted.  This
        // requires a recheck in second case to deal with
        // shutdownNow race while clearing interrupt
        if ((runStateAtLeast(ctl.get(), STOP) ||
             (Thread.interrupted() &&
              runStateAtLeast(ctl.get(), STOP))) &&
            !wt.isInterrupted())
          wt.interrupt();
        try {
          // 执行任务之前的回调
          beforeExecute(wt, task);
          try {
            // 调用任务的run方法
            task.run();
            // 执行任务之后的回调
            afterExecute(task, null);
          } catch (Throwable ex) {
            afterExecute(task, ex);
            throw ex;
          }
        } finally {
          task = null; // 将任务引用置空
          w.completedTasks++; // 递增完成任务总数
          w.unlock(); // 释放锁
        }
      }
      completedAbruptly = false;
    } finally {
      // 终止线程
      processWorkerExit(w, completedAbruptly);
    }
  }
  
  // 从队列中获取任务
  private Runnable getTask() {
    boolean timedOut = false; // 标记线程空闲的时间是否超时,默认为false

    // 死循环
    for (;;) {
      int c = ctl.get(); //拿到AtomicInteger ctl的值,该值包含了线程池的状态和线程数量

      // 仅在必要时检测队列是否为空
      // 线程池不处于运行状态 && ( 线程池处于TIDYING或者TERMINATED状态 || 队列为空 )
      if (runStateAtLeast(c, SHUTDOWN)
          && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
        decrementWorkerCount(); // 递减线程总数(当返回null后,该线程就会在processWorkerExit()方法中被中断)
        return null; // 任务返回null
      }

      // 如果上述条件都不符合,获取当前线程的总数
      int wc = workerCountOf(c);

      // allowCoreThreadTimeOut为是否允许核心线程在空闲状态还存活,默认为false
      // 是否允许核心线程在空闲状态还存活(默认false) || 当前线程数是否大于核心线程数
      // 最终取决于当前线程数是否大于核心线程数,如果大于核心线程数,则timed=true,否则为false
      boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

      // 因为timedOut默认为false,所以第一次timed && timedOut为false(从第二次循环开始,timedOut将会被置为true)
      // 当第二次循环时,就取决于timed的值,如果当前线程数是否大于核心线程数,则会将当前的空闲线程中断
      // ( (线程数>最大线程数 || false) && (线程数大于1 || 队列为空) )
      // 即主要看线程数是否大于最大线程数 和 队列是否为空
      if ((wc > maximumPoolSize || (timed && timedOut))
          && (wc > 1 || workQueue.isEmpty())) {
        // 如果递减线程总数成功,则返回null(当返回null后,该线程就会在processWorkerExit()方法中被中断)
        if (compareAndDecrementWorkerCount(c))
          return null;
        // 如果上述情况都不符合,则线程继续死循环(空转),判断自己是否要被中断
        //(在主线程会不断的丢任务到队列中,如果一旦队列不为空,则会返回对应的任务继续执行)
        continue;
      }

      try {
        // 如果当前线程数是否大于核心线程数,则阻塞的获取任务并设定超时时间,如果超时,则返回null
        // 如果当前线程数是否小于核心线程数,就一直阻塞的获取任务,直到有任务为止
        Runnable r = timed ?
          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        workQueue.take();
        // 如果任务不为null,则返回任务
        if (r != null)
          return r;
        // 否则,将timedOut置为true(标记为超时),继续死循环
        timedOut = true;
      } catch (InterruptedException retry) {
        timedOut = false;
      }
    }
  }

  ...
  
}

内部类Worker的源代码:

// Worker内部类,继承了AQS,能够在执行Runnable时简化获取和释放锁的操作,
// 其旨在唤醒正在阻塞的Worker,而不是打断正在执行任务的Worker,通过AQS实现了一个互斥锁,
// 为了防止当调用例如setCorePoolSize()方法时Worker可能获取到锁,
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  // 保存正在运行的线程的引用,如果为null,则表示ThreadFactory出错了
  final Thread thread;
  // 需要运行的任务
  Runnable firstTask;
  // 记录每个线程完成的任务数
  volatile long completedTasks;

  // 从ThreadFatory创建线程,并保存任务的引用
  Worker(Runnable firstTask) {
    setState(-1); // 在创建时抑制中断
    this.firstTask = firstTask;
    // 这里的this非常关键,如果不注意,就理解不了了
    // 在创建线程时会把当前对象Worker当作Runnable传入,当调用thread.start()方法时,
    // 实际上调用的是Worker当中的run()方法,即其中的runWorker()
    this.thread = getThreadFactory().newThread(this);
  }

  // Worker本身实现了Runnable接口
  public void run() {
    runWorker(this);
  }

  // Worker在内部通过AQS实现了独占锁,用来控制用户可以修改线程池中的线程相关的配置,例如核心线程数等
  // 0表示解锁状态,1表示锁定状态
  //========================================================
  // 获取是否锁定,true为锁定状态
  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  // 尝试通过CAS进行加锁
  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread()); //设置独占的线程
      return true;
    }
    return false;
  }

  // 尝试解锁
  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null); // 设置独占线程引用设置为null
    setState(0); // 将state设置为0,表示解锁(AQS中的state是通过volatile修饰的,修改后对其他线程都可见)
    return true;
  }

  public void lock()        { acquire(1); } //加锁,将status设置为1
  public boolean tryLock()  { return tryAcquire(1); } //通过CAS尝试加锁,如果可以加锁,则设置好独占的线程
  public void unlock()      { release(1); } //解锁,将status设置为0
  public boolean isLocked() { return isHeldExclusively(); } //判断是否被锁定,即判断status是否不等于0

  // 中断已经开始的线程
  void interruptIfStarted() {
    Thread t;
    // 不管是否加锁,0和1都>=0 && 线程引用不为空 && 没有被中断
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt(); // 中断线程
      } catch (SecurityException ignore) {
      }
    }
  }
  //========================================================
}

流程图

简单的流程图

ThreadPoolExecutor3-2020-09-09-17:05:18.jpg

总结

ThreadPoolExecutor中,使用一个 AtomicInteger的高3位来存储线程池的状态,以及低29位来存储线程总数

当我们通过线程池提交任务时,都会调用 execute()方法,在该方法中,会进行线程的一些控制,具体如下:

  • 如果当前线程数小于核心线程数时,会创建新线程(核心线程)来执行提交的任务
  • 如果当前线程数大于等于核心线程数时,会先将任务添加到队列中
  • 如果当前线程数大于等于核心线程数,并且队列已满,则会创建非核心线程来执行任务
  • 如果添加非核心线程失败(超过了最大线程数),则执行拒绝策略

ThreadPoolExecutor中实现了一个内部类 Worker,继承了 AQS和实现了 Runnable接口,在 Worker中实现了独占锁

在创建 Worker时,在内部维护了对应 Thread的引用,并将自身当作传入到 Thread中,当调用线程的 start()方法时,运行的是 Worker中的 run()方法,该方法会循环的执行任务,当执行完一个任务会,会调用 getTask()方法从队列中获取下一个任务,直到任务队列为空

getTask()方法中,会死循环的尝试获取任务,当一直没有获取到任务,并且超过设定的存活时间,那么非核心线程就会被中断

因此,当线程池中非核心线程在到达超时的过程中,其实都是在空转占用CPU(核心线程也是如此)


如果通过 getTask()获取到任务后,那么对应的 Worker就会直接调用任务对应的 run()来执行,并且在执行任务的前后还设置了两个回调函数 beforeExecute()afterExecute(),可以做一些自定义

0条评论
头像
ICP证 : 浙ICP备18021271号