Cyclic Barrier

CyclicBarrier

如何使用

CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

// 随机休息一段时间
Runnable work = () -> {
  final int sleep = Math.abs(new Random().nextInt() % 10);
  Tool.println("sleep " + sleep);
  ThreadUtil.sleep(sleep, TimeUnit.SECONDS);
};

// 调用 barrier.await()
Consumer<CyclicBarrier> await = (CyclicBarrier barrier) -> {
  try {
    barrier.await();
  } catch (Exception e) {
    e.printStackTrace();
  }
};

final Runnable runnable = () -> {
  work.run();
  await.accept(cyclicBarrier); // 等待所有线程准备好
  Tool.println("A ");

  work.run();
  await.accept(cyclicBarrier); // 等待所有线程准备好
  Tool.println("B");
};

new Thread(runnable).start();
new Thread(runnable).start();
new Thread(runnable).start();
> 566 : Thread-2 : sleep 3
> 566 : Thread-1 : sleep 2
> 566 : Thread-0 : sleep 1
> 569 : Thread-2 : ---A--- // 3 秒后同时执行
> 569 : Thread-0 : ---A---
> 569 : Thread-1 : ---A---
> // 
> 569 : Thread-2 : sleep 6
> 569 : Thread-0 : sleep 6
> 569 : Thread-1 : sleep 5
> 575 : Thread-0 : ---B---  // 6 秒后同时执行
> 575 : Thread-2 : ---B---
> 575 : Thread-1 : ---B---

实现原理

CyclicBarrier 并非直接基于 AQS, 而是基于 ReentrantLock 和 Condition

  • 所有线程的 await 操作 都是 lock() 操作,获取到锁之后,凭证数减1
  • 如果不到 0 ,就 lock.condition.await ,等待被唤醒
  • 如果到 0 了,继续执行并 lock.condition.signalAll,唤醒所有等待的线程继续执行
  • 并重新开始计数

await

public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe); // cannot happen
  }
}

dowait

private int dowait(boolean timed, long nanos)  throws ... {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    final Generation g = generation;
    ...

    int index = --count;
    // ❤❤❤ index 减到 0,从新开始下一次计数 ❤❤❤
    if (index == 0) {  // tripped
      boolean ranAction = false;
      try {
        ...
        nextGeneration();
        return 0;
      } finally {
        if (!ranAction)
          breakBarrier();
      }
    }

    for (;;) {
      try {
        if (!timed)
          // ❤❤❤ await 释放锁 ❤❤❤
          trip.await();
        else if (nanos > 0L)
          // ❤❤❤ await 释放锁 ❤❤❤
          nanos = trip.awaitNanos(nanos);
      } catch (InterruptedException ie) {
        ...
      }
      ...
    }
  } finally {
    lock.unlock();
  }
}

breakBarrier

private void breakBarrier() {
  // 重置栅栏
  // 线程中断后需要 reset(), 否则会抛出 BrokenBarrierException 异常
  generation.broken = true;
  // 重置计数
  count = parties;
  // 唤醒所有 持有锁并等待的线程
  trip.signalAll();
}

nextGeneration

private void nextGeneration() {
  // 唤醒所有 持有锁并等待的线程
  trip.signalAll();
  // 重置计数
  count = parties;
  // generation.broken = false;
  generation = new Generation();
}

Read More