继续研究Jdk7的新特性,这次看看jdk7对Concurrency的增强。concurrency这次新增了3个新元素。
- Phaser,类似于CountDownLatch和CyclicBarrier
- TransferQueue, 比SynchronousQueue更加高效的队列
- Fork-Join, 类似与MapReduce的东东
体验下Phaser, 这个东东和CountDownLatch,CyclicBarrier的功能比较类似,但是要更加灵活,提供了更多的控制。
先来回顾下CountDownLatch, CyclicBarrier。下面这段程序每次同时执行一组任务,然后当这一组任务执行完后,再执行下一组。用CyclicBarrier对每组的各个任务的执行时机进行控制,用CountDownLatch对各组任务的执行时机进行控制。
复习完毕,下面来把Phaser好好弄清楚。这个东东究竟是个啥,为啥能比CountDownLatch,CyclicBarrier更灵活呢。Phaser类的注释里对Phaser做了充分的描述,我们这里简单整理下。
Phaser是一个可重用的同步Barrier,类似与CountDownLatch和CyclicBarrier,但是要更加灵活
Registration. Phaser支持通过register()和bulkRegister(int parties)方法来动态调整注册任务的数量,此外也支持通过其构造函数进行指定初始数量。在适当的时机,Phaser支持减少注册任务的数量,例如 arriveAndDeregister()。单个Phaser实例允许的注册任务数的上限是65535。
Arrival. 正如Phaser类的名字所暗示,每个Phaser实例都会维护一个phase number,初始值为0。每当所有注册的任务都到达Phaser时,phase number累加,并在超过Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于记录到 达,arriveAndAwaitAdvance()方法用于记录到达,并且等待其它未到达的任务。
Termination.Phaser支持终止。Phaser终止之后,调用register()和bulkRegister(int parties)方法没有任何效果,arriveAndAwaitAdvance()方法也会立即返回。触发终止的时机是在protected boolean onAdvance(int phase, int registeredParties)方法返回时,如果该方法返回true,那么Phaser会被终止。默认实现是在注册任务数为0时返回true(即 return registeredParties == 0;)。此外,forceTermination()方法用于强制终止,isTerminated()方法用于判断是否已经终止。
Tiering. Phaser支持层次结构,即通过构造函数Phaser(Phaser parent)和Phaser(Phaser parent, int parties)构造一个树形结构。这有助于减轻因在单个的Phaser上注册过多的任务而导致的竞争,从而提升吞吐量,代价是增加单个操作的开销。
描述已经好清楚了,下面我们用Phaser实现上面类似功能的程序。