Syncing with CyclicBarrier

CyclicBarrier is useful in situations where you want each thread in a set of threads to reach at specific execution point before moving further.

How to implement?

CyclicBarrier constructor received two arguments.

  • No. of threads to reach the barrier. Minimum 1 is allowed here. Anything less than 1 will trigger IllegalArgumentException()
  • Optional runnable task to invoke, once all threads reach the barrier and before resuming threads blocked by barrier.

await() function is invoked in thread to make current thread wait until all other threads invoke await() i.e. reach the barrier or one of the following situations occur

  • The last thread arrives
  • Some other thread interrupts the current thread
  • Some other thread interrupts one of the other waiting threads
  • Some other thread times out while waiting for barrier
  • Some other thread invokes reset on this barrier.

Example

Let’s make a simple program to demonstrate the behaviour of cyclic barrier. We create a CyclicBarrier which waits for await() method to be invoked three times. In each thread, we wait for some random time and print data about thread. Finally, we print a message once each thread reaches the barrier.

MyThread.java

class MyThread extends Thread {

    private final CyclicBarrier barrier;

    MyThread(String name, CyclicBarrier barrier){
        super(name);
        this.barrier = barrier;
    }

    @Override
    public void run() {
        System.out.printf("running %s \n", getName());
        int sleepTime = new Random().nextInt(2000);
        System.out.printf("[%s] sleeping for %dms\n", getName(), sleepTime);
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.printf("[%s] invoking await on barrier[%s]\n", getName(), barrier.hashCode());

        try {
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

        System.out.printf("[%s] after waiting for barrier[%s]\n", getName(), barrier.hashCode());
    } // run

} // MyThread

Barrier Main class

public class BarrierApp {

    public static void run(){
        CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("\nall threads have reached the barrier.\n"));

        new MyThread("A", barrier).start();
        new MyThread("B", barrier).start();
        new MyThread("C", barrier).start();


    } // run

} // BarrierApp

Output

running A
running C
running B
[A] sleeping for 1496ms
[C] sleeping for 450ms
[B] sleeping for 1435ms
[C] invoking await on barrier[6867595]
[B] invoking await on barrier[6867595]
[A] invoking await on barrier[6867595]

all threads have reached the barrier.

[A] after waiting for barrier[6867595]
[B] after waiting for barrier[6867595]
[C] after waiting for barrier[6867595]

Repeating Cycle

Unlike CountDownLatch, CyclicBarrier is repeatable. We can again invoke await() of new threads and CyclicBarrier will trigger completion tasks and release thread when a barrier is reached or any of above discussed situation occurs.

Modify the above example, add following code in BarrierApp.run()

    new MyThread("A", barrier).start();
    new MyThread("B", barrier).start();
    new MyThread("C", barrier).start();
    // repeat
    new MyThread("Aa", barrier).start();
    new MyThread("Bb", barrier).start();
    new MyThread("Cc", barrier).start();

Output for repeated cycle

Starting CyclicBarrier demo...
running B
running Cc
running Bb
running Aa
running C
running A
[Cc] sleeping for 1091ms
[Bb] sleeping for 81ms
[C] sleeping for 1385ms
[B] sleeping for 1677ms
[Aa] sleeping for 777ms
[A] sleeping for 1371ms
[Bb] invoking await on barrier[1122215709]
[Aa] invoking await on barrier[1122215709]
[Cc] invoking await on barrier[1122215709]

all threads have reached the barrier.

[Cc] after waiting for barrier[1122215709]
[Aa] after waiting for barrier[1122215709]
[Bb] after waiting for barrier[1122215709]
[A] invoking await on barrier[1122215709]
[C] invoking await on barrier[1122215709]
[B] invoking await on barrier[1122215709]

all threads have reached the barrier.

[B] after waiting for barrier[1122215709]
[C] after waiting for barrier[1122215709]
[A] after waiting for barrier[1122215709]

Resetting Barrier

Calling reset on Barrier resets the barrier and throws BrokenBarrierException() if there are threads awaiting for barrier.

Note that resets after a breakage has occurred for other reasons can be complicated to carry out; threads need to re-synchronize in some other way, and choose one to perform the reset. It may be preferable to instead create a new barrier for subsequent use.

Modify the BarrierApp.run() method as below

 new MyThread("A", barrier).start();
 new MyThread("B", barrier).start();

 Thread.sleep(5000); // avoid race condition
 barrier.reset();

Output

running A
running B
[A] sleeping for 561ms
[B] sleeping for 1330ms
[A] invoking await on barrier[42326203]
[B] invoking await on barrier[42326203]
[A] after waiting for barrier[42326203]
[B] after waiting for barrier[42326203]
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at io.github.allaudin.cbarrier.MyThread.run(MyThread.java:35)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at io.github.allaudin.cbarrier.MyThread.run(MyThread.java:35)

top