} } }

    小学徒进阶系列—揭开ThreadPoolutor神秘的面纱

    添加时间:2013-5-14 点击量:

      前提择要:本文是基于jdk1.7的,在解析ThreadPoolutor代码的过程中百度时发明1.1.7的实现还是有必然的区此外并且还挺大的,小我感触感染1.6斗劲简单好懂得。


      为了便利大师浏览懂得,我把申明以注释的情势潜入到了代码中。


      关于线程池,它不仅有效的复用了对象,更有效的复用了线程,削减了线程创建,烧毁,恢复等状况切换的开销,进步了法度的机能。然则,毕竟线程池是怎么复用对象的呢?它又是如何去复用线程削减开销的呢?下面我们来一一揭开,ThreadPoolutor神秘的面纱。


     1.根蒂根基变量和办法


      为了可以或许更好的进行解析,我们先来做一些热身活动,懂得下线程池的几个首要的变量吧。


      1.起首大师好先懂得下原子变量的概念,具体可以参考官网文档:http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/atomic/package-summary.html


      2.在这里,我们先讲讲两个会贯穿全文的单词:1> workerCount:当前活动的线程数;2> runState:线程池的当前状况


      下面我们开端解析吧。


     1.1根蒂根基变量和办法



    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));


      这个是用一个int来默示workerCount和runState的,此中runState占int的高3位,其它29位为workerCount的值。
      用AtomicInteger是因为其在并发下应用compareAndSet效力很是高;
      当改变当前活动的线程数时只对低29位操纵,如每次加一减一,workerCount的值变了,但不会影响高3位的runState的值。


      当改变当前状况的时辰,只对高3位操纵,不会改变低29位的计数值。
      这里有一个假设,就是当前活动的线程数不会跨越29位能默示的值,即不会跨越536870911,
      就今朝以及可预感的很长一段时候来讲,这个值是足够用了。同时遵守源代码中注释供给的说法,一旦将来跨越了AtomicInteger承受的局限,变量类型到时辰可以调换为AtomicLong类型。



     ------------------------------------------------------------------------我是神奇的分别线-----------------------------------------------------------------------------------




    private static final int COUNT_BITS = Integer.SIZE - 3;


      起首,Integer.SIZE的值为32,他减去3今后,值就为29


      COUNT_BITS,就是用来默示workerCount占用一个int的位数,其值为前面说的29



      ------------------------------------------------------------------------ 我是神奇的分别线 -----------------------------------------------------------------------------------




    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


      1右移认为后二进制的默示是00100000000000000000000000000000,减去1之后的值就是00011111111111111111111111111111(占29位,29个1)


      CAPACITY为29位能默示的最大容量,即workerCount实际能用的最大值(536870911)


     1.2线程池的状况


      接下来的几个变来那个描述的是关于线程池的状况,分别是:


      1> RUNNING : 该状况下线程池能接管新任务,并且可以运行队列中的任务


             -1的二进制为32个1,移位后为:11100000000000000000000000000000



    private static final int RUNNING    = -1 << COUNT_BITS;



      2> SHUTDOWN : 该状况下的线程池不再接管新任务,但仍可以履行队列中的任务


                0的二进制为32个0,移位后还是全0(00000000000000000000000000000000)



    private static final int SHUTDOWN   =  0 << COUNT_BITS;



      3> STOP : 该状况下的线程池不再接管新任务不再履行队列中的任务,并且要中断正在处理惩罚的任务
            1的二进制为前面31个0,最后一个1,移位后为:00100000000000000000000000000000



    private static final int STOP       =  1 << COUNT_BITS;


      


      4>TIDYING : 该状况下的线程池所有任务均已终止,workerCount的值为0,转到TIDYING状况的线程即将要履行terminated()办法.
             2的二进制为00000000000000000000000000000010 移位后01000000000000000000000000000000



    private static final int TIDYING    =  2 << COUNT_BITS;


      


      5>TERMINATED : 该状况下的线程池申明 terminated()办法履行停止.
                 3的二进制为00000000000000000000000000000011,移位后01100000000000000000000000000000



    private static final int TERMINATED =  3 << COUNT_BITS;


       线程池各个状况间的转换:


      1>RUNNING -> SHUTDOWN : 调用了shutdown办法,线程池实现了finalize办法,在里面调用了shutdown办法,是以shutdown可能是在finalize中被隐式调用的


      2>(RUNNING or SHUTDOWN) -> STOP 调用了shutdownNow办法


      3>SHUTDOWN -> TIDYING : 当队列和线程池均为空的时辰


      4>STOP -> TIDYING : 当线程池为空的时辰


      5>TIDYING -> TERMINATED : terminated()办法调用完毕


     1.3根蒂根基办法  


      1> 这个办法用于取出当前活动线程的数量,也就是workerCount的值。



    /
    
    这个办法用于取出workerCount的值
    因为CAPACITY值为:00011111111111111111111111111111,所以&操纵将参数的高3地位0了
    保存参数的低29位,也就是workerCount的值
    @param c ctl, 存储runState和workerCount的int值
    @return workerCount的值
    /
    private static int workerCountOf(int c) { return c & CAPACITY; }


       


      2> 这个办法用于取出当火线程池的运行状况,也就是runState的值



    /
    
    这个办法用于取出runState的值
    因为CAPACITY值为:00011111111111111111111111111111
    ~为按位取反操纵,则~CAPACITY值为:11100000000000000000000000000000
    再同参数做&操纵,就将低29地位0了,而高3位还是对峙本来的值,也就是runState的值
    @param c 该参数为存储runState和workerCount的int值
    @return runState的值
    /
    private static int runStateOf(int c) { return c & ~CAPACITY; }


       


      3> 这个办法将runState和workerCount的值经由过程或运算存到同一个int中



    /
    
    将runState和workerCount存到同一个int中
    “|”运算的意思是,假设rs的值是101000,wc的值是000111,则他们位或运算的值为101111
    @param rs runState移位过后的值,负责填充返回值的高3位
    @param wc workerCount移位过后的值,负责填充返回值的低29位
    @return 两者或运算过后的值
    /
    private static int ctlOf(int rs, int wc) { return rs | wc; }



     2.关键办法的解析


      下面我们按照前面我们写的一个线程池的办法进行测试吧,此处只是摘取一点列取出来,具体完全代码请见《小学徒成长系列—线程同步、死锁、线程池》:



    executorService.execute(new TaskThread());    //创建任务并交给线程池进行经管


      1> 上方的进口是execute,那么我们就从execute()办法开端进行解析吧,为了便利大师浏览,我都以注释的情势直接写在代码上,或许这个过程斗劲呆板,然则只要你对峙下去,必然会受益匪浅。



     1  public void execute(Runnable command) {
    
    2 //任务为null,则抛出异常
    3 if (command == null
    4 throw new NullPointerException();
    5
    6 int c = ctl.get(); //取出记录着runState和workerCount 的 ctl的当前值
    7
    8 //经由过程workerCountOf办法从ctl所默示的int值中提取出低29位的值,也就是当前活动的线程数
    9 //若是(当前活动的线程 < corePoolSize)
    10 if (workerCountOf(c) < corePoolSize) {
    11 //创建新的线程
    12 //对于该函数形参,command就是恳求任务,
    13 //而true默示须要检测当前运行的线程是否小于corePoolSize
    14 //false默示须要检测当前运行的线程数量是否小于maxPoolSize
    15 if (addWorker(command, true))
    16 return; //创建线程成功,则停止该终止该办法的履行
    17 c = ctl.get(); //若是添加失败,则取出记录着runState和workerCount 的 ctl的当前值
    18 }
    19 //当火线程池处于运行状况且队列未满 && 若是线程正在运行中并且任务添加到缓冲队列成功
    20 if (isRunning(c) && workQueue.offer(command)) {
    21 int recheck = ctl.get(); //再次获取用于下面再次搜检
    22 if (! isRunning(recheck) && remove(command)) //若是线程池已经处于非运行状况,则从缓冲队列中移除任务并拒绝
    23 reject(command); //采取线程池指定的策略拒绝任务
    24 else if (workerCountOf(recheck) == 0) //若是线程池处于运行状况 或者线程池已经处于非运行状况然则任务移除失败
    25 addWorker(nullfalse);
    26 }
    27 // 1. 当火线程池并不处于Running状况
    28 // 2. 当火线程池处于Running状况,然则缓冲队列已经满了
    29 else if (!addWorker(command, false))
    30 reject(command); //采取线程池指定的策略拒绝任务
    31 }


      关于上方的execute(Runnable command),大项目组的申明都在代码的注释中啦。


      或许大师会有疑问:上方已经有了断定当前活动的线程小于corePoolSize了,那么便是和大于corePoolSize怎么处理惩罚呢?


      解答:不知道大师有没有重视到,当当前活动的线程数量 >= corePoolSize 的时辰,都是优先添加到队列中,直到队列满了才会去创建新的线程,在这里第20行的if语句已经表现出来了。这里哄骗了&&的特点,只有当第一个前提会真时才会去断定第二个前提,第一个前提是isRunning(),断定线程池是否处于RUNNING状况,因为只有在这个状况下才会接管新任务,不然就拒绝,若是正处于RUNNING状况,那么就参加队列,若是参加失败可能就是队列已经满了,这时辰直接履行第29行。



      2> 在execute()办法中,当 当前活动的线程数量 < corePoolSize 时,会履行addWorker()办法,关于addWorker(),它是用来直接新建线程用的,之所以叫addWorker而不是addThread是因为在线程池中,所有的线程都用一个Worker对象包装着,好吧,我们先来看看这个办法。



     1    /
    
    2 创建并履行新线程
    3 @param firstTack 用于指定新增的线程履行的第一个任务
    4
    5 @param core true默示在新增线程时会断定当前活动线程数是否少于corePoolSize,
    6 false默示新增线程前须要断定当前活动线程数是否少于maximumPoolSize
    7
    8 @return 是否成功新增一个线程
    9 /
    10 private boolean addWorker(Runnable firstTask, boolean core) {
    11 retry:
    12 for (;;) {
    13 int c = ctl.get(); //获取记录着runState和workCount的int变量的当前值
    14 int rs = runStateOf(c); //获取当火线程池运行的状况
    15
    16 //if语句中的前提转换成一个等价实现:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
    17 /
    18 这个前提代表着以下几个情景,就直接返回false申明线程创建失败:
    19 1.rs > SHUTDOWN; 此时不再接管新任务,且所有的任务已经履行完毕
    20 2.rs = SHUTDOWN; 此时不再接管新任务,然则会履行队列中的任务,在后买年的或语句中,第一个不成立,firstTask != null成立
    21 3.rs = SHUTDOWN;此时不再接管新任务,fistTask == null,任务队列workQueue已经空了
    22 /
    23 if (rs >= SHUTDOWN &&
    24 ! (rs == SHUTDOWN &&
    25 firstTask == null &&
    26 ! workQueue.isEmpty()))
    27 return false;
    28
    29 for (;;) {
    30 //获取当前活动的线程数
    31 int wc = workerCountOf(c);
    32 //先断定当前活动的线程数是否大于最大值,若是跨越了就直接返回false申明线程创建失败
    33 //若是没有跨越再按照core的值再进行以下断定
    34 /
    35 1.core为true,则断定当前活动的线程数是否大于corePoolSize
    36 2.core为false,则断定当前活动线程数是否大于maximumPoolSize
    37 /
    38 if (wc >= CAPACITY ||
    39 wc >= (core ? corePoolSize : maximumPoolSize))
    40 return false;
    41 //斗劲当前值是否和c雷同,若是雷同,则改为c+1,并且跳出大轮回,直接履行Worker进行线程创建
    42 if (compareAndIncrementWorkerCount(c))
    43 break retry;
    44 c = ctl.get(); // 获取ctl的当前值
    45 if (runStateOf(c) != rs) //搜检下当火线程池的状况是否已经产生改变
    46 continue retry; //若是已经改变了,则进行外层retry大轮回,不然只进行内层的轮回
    47 // else CAS failed due to workerCount change; retry inner loop
    48 }
    49 }
    50 //下面这里就是开端创建新的线程了
    51 //Worker的也是Runnable的实现类
    52 Worker w = new Worker(firstTask);
    53 //因为不成以直接在Worker的机关办法中进行线程创建
    54 //所以要把它的引用赋给t便利后面进行线程创建
    55 Thread t = w.thread;
    56
    57 final ReentrantLock mainLock = this.mainLock;
    58 mainLock.lock();
    59 try {
    60
    61 //再次取出ctl的当前值,用于进行状况的搜检,防止线程池的已经状况改变了
    62 int c = ctl.get();
    63 int rs = runStateOf(c);
    64
    65 //将if语句中的前提转换为一个等价实现 :t == null || (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null))
    66 //有个t == null是因为若是应用的是默认的ThreadFactory的话,那么它的newThread()可能会返回null
    67 /
    68 1. 若是t == null, 则削减一个线程数,若是线程池处于的状况 > SHUTDOWN,则测验测验终止线程池
    69 2. 若是t != null,且rs == SHUTDOWN,则不再接管新任务,若firstTask != null,则此时也是返回false,创建线程失败
    70 3. 若是t != null, 且rs > SHUTDOWN,同样不再接管新任务,此时也是返回false,创建线程失败
    71 /
    72 if (t == null ||
    73 (rs >= SHUTDOWN &&
    74 ! (rs == SHUTDOWN &&
    75 firstTask == null))) {
    76 decrementWorkerCount(); //削减一个活动的当火线程数
    77 tryTerminate(); //测验测验终止线程池
    78 return false; //返回线程创建失败
    79 }
    80
    81 workers.add(w); //将创建的线程添加到workers容器中
    82
    83 int s = workers.size(); //获取当火线程活动的数量
    84 if (s > largestPoolSize) //断定当火线程活动的数量是否跨越线程池大线程数量
    85 largestPoolSize = s; //当池中的工作线程创新高时,会将这个数记录到largestPoolSize字段中。然后就可以启动这个线程t了
    86 } finally {
    87 mainLock.unlock();
    88 }
    89
    90 t.start(); //开启线程
    91 //若start后,状况又变成了SHUTDOWN状况(如调用了shutdownNow办法)且新建的线程没有被中断过,
    92 //就要中断该线程(shutdownNow办法请求中断正在履行的线程),
    93 //shutdownNow办法本身也会去中断存储在workers中的所有线程
    94 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
    95 t.interrupt();
    96
    97 return true;
    98 }


      那么在创建线程的时辰,线程履行的是什么的呢?


      我们前面提到Worker持续的其实也是Runnable,它在创建线程的时辰是以自身作为任务传进先创建的线程中的,这段斗劲简单,我就不一一注释了,只是给出源代码给大师看吧。



          Worker(Runnable firstTask) {
    
    this.firstTask = firstTask;
    //this指的是worker对象本身
    this.thread = getThreadFactory().newThread(this);
    }



       它以自身的对象作为线程任务传进去,那么它的run办法又是如何的呢?



     public void run() {
    
    runWorker(
    this);
    }


       竟然只有一句话调用runWorker()办法,这个可是重头戏,我们来看看,毕竟运行的是什么。



     1 /
    
    2 履行Worker中的任务,它的履行流程是如许的:
    3 若存在第一个任务,则先履行第一个任务,不然,从队列中拿任务,络续的履行,
    4 直到getTask()返回null或履行任务失足(中断或任务本身抛出异常),就退出while轮回。
    5 @param w woker
    6 /
    7 final void runWorker(Worker w) {
    8 Runnable task = w.firstTask; //将当前Worker中的任务取出来交给task,并开释掉w.firstTask占用的内存
    9 w.firstTask = null;
    10 //用于断定线程是否因为异常终止,若是不是异常终止,在后面将会将该变量的值改为false
    11 //该变量的值在processWorkerExit()会应用来断定线程是否因为异常终止
    12 boolean completedAbruptly = true;
    13 try {
    14 //履行任务,直到getTask()返回的值为null,在此处就相当于复用了线程,让线程履行了多个任务
    15 while (task != null || (task = getTask()) != null) {
    16 w.lock();
    17 clearInterruptsForTaskRun();//对线程池状况进行一次断定,后面我们会讲解一下该办法
    18 try {
    19 beforeute(w.thread, task); //在任务履行前须要做的逻辑办法,该方面可以由用户进行重写自定义
    20 Throwable thrown = null;
    21 try {
    22 task.run(); //开端履行任务
    23 } catch (RuntimeException x) {
    24 thrown = x; throw x;
    25 } catch (Error x) {
    26 thrown = x; throw x;
    27 } catch (Throwable x) {
    28 thrown = x; throw new Error(x);
    29 } finally {
    30 afterute(task, thrown); //在任务履行后须要做的逻辑办法,该方面可以由用户进行重写自定义
    31 }
    32 } finally {
    33 task = null;
    34 w.completedTasks++; //增长该线程完成的任务
    35 w.unlock();
    36 }
    37 }
    38 completedAbruptly = false; //线程不是异常终止
    39 } finally {
    40 processWorkerExit(w, completedAbruptly); //停止该线程
    41 }
    42 }


     下面就是线程在履行任务之前对线程池状况的一次断定:



     1     /
    
    2 对线程的停止做一些清理和数据同步
    3 @param w 封装线程的Worker
    4 @param completedAbruptly 默示该线程是否停止于异常
    5 /
    6 private void processWorkerExit(Worker w, boolean completedAbruptly) {
    7 // 若是completedAbruptly值为true,则申明线程是停止于异常
    8 //若是不是停止于异常,那么它降在runWorker办法的while轮回中的getTask()办法中已经减一了
    9 if (completedAbruptly)
    10 decrementWorkerCount(); //此时将线程数量减一
    11
    12 final ReentrantLock mainLock = this.mainLock;
    13 mainLock.lock();
    14 try {
    15 completedTaskCount += w.completedTasks; //统计统共完成的任务数
    16 workers.remove(w); //将该线程数从workers容器中移除
    17 } finally {
    18 mainLock.unlock();
    19 }
    20
    21 tryTerminate(); //测验测验终止线程池
    22
    23 int c = ctl.get();
    24 //接下来的这个if块要做的事儿了。当池的状况还是RUNNING,
    25 //又要分两种景象,一种是异常停止,一种是正常停止。异常停止斗劲好弄,直接加个线程调换死掉的线程就好了,
    26 //也就是最后的addWorker操纵
    27 if (runStateLessThan(c, STOP)) { //若是当前运行状况为RUNNING,SHUTDOWN
    28 if (!completedAbruptly) { //若是线程不是停止于异常
    29 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //是否容许线程超时停止
    30 if (min == 0 && ! workQueue.isEmpty()) //若是容许把那个且队列不为空
    31 min = 1; //至少要保存一个线程来完成任务
    32 //若是当前活动的线程数大于便是小值
    33 // 1.不容许核心线程超时停止,则必必要使得活动线程数跨越corePoolSize数才可以
    34 // 2. 容许核心线程超时停止,然则队列中有任务,必须留至少一个线程
    35 if (workerCountOf(c) >= min)
    36 return; // replacement not needed
    37 }
    38 //直接加个线程
    39 addWorker(nullfalse);
    40 }
    41 }



     前面我们的办法碰见过很多次tryTerminate()办法,到底他是如何测验测验停止线程池的呢?



     1     /
    
    2 履行该办法,按照线程池状况进行
    3 断定是否停止线程池
    4 /
    5 final void tryTerminate() {
    6 for (;;) {
    7 int c = ctl.get();
    8 if (isRunning(c) || //线程池正在运行中,天然不克不及停止线程池啦
    9 runStateAtLeast(c, TIDYING) || //若是状况为TIDYING或TERMINATED,池中的活动线程数已经是0,天然也不须要做什么操纵了
    10 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //线程池出于SHUTDOWN状况,然则任务队列不为空,天然不克不及停止线程池啦
    11 return;
    12 if (workerCountOf(c) != 0) { // Eligible to terminate
    13 /
    14 调用这个办法的目标是将shutdown旌旗灯号传播给其它线程。
    15 调用shutdown办法的时辰会去中断所有余暇线程,若是这时辰池中所有的线程都正在履行任务,
    16 那么就不会有线程被中断,调用shutdown办法只是设置了线程池的状况为SHUTDOWN,
    17 在取任务(getTask,后面会细说)的时辰,假如许多线程都发明队列里还有任务(没有应用锁,存在竞态前提),
    18 然后都去调用take,若是任务数小于池中的线程数,那么必定有办法调用take后会一向守候(shutdown的时辰这些线程正在履行任务,
    19 所以没能调用它的interrupt,此中断状况没有被设置),那么在没有任务且线程池的状况为SHUTDWON的时辰,
    20 这些守候中的余暇线程就须要被终止iinterruptIdleWorkers(ONLY_ONE)归去中断一个线程,让其从take中退出,
    21 然后这个线程也进入同样的逻辑,去终止一个其它余暇线程,直到池中的活动线程数为0。
    22 /
    23 interruptIdleWorkers(ONLY_ONE);
    24 return;
    25 }
    26
    27 final ReentrantLock mainLock = this.mainLock;
    28 mainLock.lock();
    29 try {
    30 /
    31 当状况为SHUTDOWN,且活动线程数为0的时辰,就可以进入TIDYING状况了,
    32 进入TIDYING状况就可以履行办法terminated(),
    33 该办法履行停止就进入了TERMINATED状况(参考前文中各状况的含义以及可能的状况改变)
    34 /
    35 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    36 try {
    37 terminated(); //履行该办法,停止线程池
    38 } finally {
    39 ctl.set(ctlOf(TERMINATED, 0));
    40 /
    41 当线程池shutdown后,外部可能还有很多线程在守候线程池真正停止,
    42 即调用了awaitTermination办法,该办法中,外部线程就是在termination上await的,
    43 所以,线程池封闭之前要唤醒这些守候的线程,告诉它们线程池封闭停止了。
    44 /
    45 termination.signalAll();
    46 }
    47 return;
    48 }
    49 } finally {
    50 mainLock.unlock();
    51 }
    52 // else retry on failed CAS
    53 }
    54 }




    原来,再大的房子,再大的床,没有相爱的人陪伴,都只是冰冷的物质。而如果身边有爱人陪伴,即使房子小,床小,也觉得无关紧要,因为这些物质上面有了爱的温度,成了家的元素。—— 何珞《婚房》#书摘#
    分享到: