[译]How To Implement Inter-thread Communication In Java

原文地址:https://www.tutorialdocs.com/article/java-inter-thread-communication.html
文章涉及代码地址:https://github.com/wingjay/HelloJava/blob/master/multi-thread/src/ForArticle.java

虽然一般来说每个子线程只需要完成自己的任务,但是有时需要多线程一起工作来完成一个任务,这就需要线程间的通信。

使用了几个例子来即时如何用Java实现线程间通信:

  • 如何让两个线程顺序执行?
  • 如何让两个线程以指定的方式交叉执行?
  • 有四个线程:A,B,C和D。A,B和C同步执行结束后,再执行D。
  • 三个线程分开准备,然后在每个线程准备好后同时开始执行。
  • 子线程完成任务后,向主线程返回结果。

如何让两个线程顺序执行?

假设有两个线程: 线程A和线程B。两个线程都顺序打印三个数字(1-3)。

    public static void main(String[] args){
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                printNumber("A");
            }
        });

        Thread B = new Thread(new Runnable() {
            @Override
            public void run() {
                printNumber("B");
            }
        });
        A.start();
        B.start();
    }

    // Print Number in thread with threadName
    private static void printNumber(String threadName){

        for(int i = 0; i < 3; i ++){
            try {
                Thread.sleep(100);
            }catch (Exception e){
                e.printStackTrace();
            }
            System.out.println(threadName + ": " + i);
        }
    }

会得到结果:

A: 0
B: 0
B: 1
A: 1
B: 2
A: 2

并不是唯一结果,可能会出现其他排序,但是值永远都是001122

如果我们想要B线程在A线程之后打印数字改怎么做? 使用thread.join()方法:

    Thread B = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                A.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            printNumber("B");
        }
    });

结果是:

A: 0
A: 1
A: 2
B: 0
B: 1
B: 2

如何让两个线程以指定的方式交叉执行?

如果现在想要让B线程只在A打印了1之后开始打印数字,然后A再继续打印2和3,要怎么做? 很显然,我们需要更细粒度的锁来控制执行顺序。 我么可以使用object.wait()object.motify()方法。

    public static void main(String[] args){
        Object lock = new Object();

        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println("A: " + 1);

                    try {
                        lock.wait();
                    }catch (Exception e){
                        e.printStackTrace();
                    }

                    System.out.println("A: " + 2);
                    System.out.println("A: " + 3);
                }

            }
        });

        Thread B = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println("B: " + 1);
                    System.out.println("B: " + 2);
                    System.out.println("B: " + 3);

                    lock.notify();
                }
            }
        });
        A.start();
        B.start();
    }

输出结果为:

A: 1
B: 1
B: 2
B: 3
A: 2
A: 3

在这个过程中发生了什么:

  • 创建了一个对象,用于A和B锁对象
  • 当A获取了锁,首先打印1,然后调用locak.wait()方法,进入等待状态,并放弃锁的控制
  • B线程直到A调用了lock.wait()方法释放锁的控制后才会获取锁并执行
  • B获取锁后打印1,2,3,然后调用lock.notify()方法来唤醒等待中的A线程
  • A在被唤醒后,继续打印剩下的2,3 加上了注释的代码:
    public static void main(String[] args){
        Object lock = new Object();

        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("INFO: A 线程等待lock");
                synchronized (lock) {
                    System.out.println("INFO: A 获得lock锁");
                    try {
                        Thread.sleep(100);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    System.out.println("A: " + 0);
                    try {
                        System.out.println("INFO: A 线程准备进入Waiting状态,释放lock的控制");
                        lock.wait();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    System.out.println("INFO: A线程被唤醒,重新获取lock的控制");
                    System.out.println("A: " + 1);
                    System.out.println("A: " + 2);
                }

            }
        });

        Thread B = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("INFO: B 线程等待lock");
                synchronized (lock) {
                    System.out.println("INFO: B 线程获得lock");
                    System.out.println("B: " + 1);
                    System.out.println("B: " + 2);
                    System.out.println("B: " + 3);
                    System.out.println("INFO: B 线程结束打印并调用notify方法");
                    lock.notify();
                }
            }
        });
        A.start();
        B.start();
    }

有四个线程:A,B,C和D。A,B和C同步执行结束后,再执行D

使用thread.join()可以让线程等待另一个线程执行完成后继续执行,但不符合现在要做的ABC需要同步执行然后执行D的场景。 我们想要达到的目标是:有三个线程A,B,C,可以同时开始执行,并且每个线程当单独执行完后将通知D;直到A,B和C都结束运行之前D不会开始运行。所以我们使用CountdownLatch来实现这种类型的沟通。它基础的用法是:

  1. 创建一个计时器,然后设置一个初始值,CountdownLatch countDownLatch = new COuntDownLatch(3);
  2. 在待命的相中中调用countDownLatch.await()方法,它将处于等待状态,直到计数值编程0
  3. 在其他线程中调用countDownLatch.countDown()方法,这个方法将计数值减一
  4. 当在其他线程中countDown()方法将计数值编程0时,在待命线程中的countDownLatch.await()方法将立即退出,并继续执行后续的代码

实现的代码如下:

    public static void main(String[] args){
        int worker = 3;
        CountDownLatch countDownLatch = new CountDownLatch(worker);

        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    countDownLatch.await();
                    System.out.println("开始执行等待线程D");
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();

        for (char threadName='A'; threadName <= 'C'; threadName++) {
            final String tN = String.valueOf(threadName);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程" + tN + "开始执行");
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程" + tN + "运行结束");
                    countDownLatch.countDown();
                }
            }).start();
        }
    }

返回的结果是:

线程A开始执行
线程B开始执行
线程C开始执行
线程C运行结束
线程A运行结束
线程B运行结束
开始执行等待线程D

事实上,CountDownLatch自身就是一个计数器,我们设置它的初始值为3.当D开始运行时,它首先调用countDownLatch.await()方法来确认计数器的值是否是0,并且如果计数器的值不为0它会留在等待状态。 A,B和C每个分别会在运行结束后,使用countDownLatch.countDown()方法来使倒数计数器的值减1。 然后,线程D的await()方法在A,B和C结束后被启动,并且D将继续执行后面的代码。 因此,CountDownLatch在这种一个线程需要等待多个线程的情况下是非常合适的。

延伸拓展:《警报:线上事故之CountDownLatch的威力》

原文地址:警报:线上事故之CountDownLatch的威力

在分布式系统中,通过使用CountDownLatch的countDown()方法来对不同数据源初始化连接进行计数,当所有数据源都连接完成后,继续执行后续的逻辑。 在文章中,出现了某一个数据源因为未知原因,处于不健康状态,因此没有调用对应的countDown()方法,导致计数器一直无法到0,以致于程序无法继续运行。

反思

只要不调用CountDownLatch.countDown(),就需要一直等下去。对于这样的事件,使用CountDownLatch有一定责任,太过于相信集群的健康程度以及监控,需要通过代码把控最大连接超时时间,即如果指定时间内未返回,就调用countDown()使其他能正常运行。 当然,也可能在那时出现网络延迟,导致请求返回失败,这可以通过尝试机制解决。重试3次连接请求,均不行,就调用countDown()方法。

总结
  • 需要将结果纳入可控的范围中,不能过度依赖于监控指标
  • 针对长耗时的业务,一定要做超时机制
  • CountDownLatch在高并发场景很实用,但需要合理使用

三个线程分开准备,然后在每个线程准备好后同时开始执行。

这次,三个线程A,B和C需要分开准别,然后在他们三个都准备好之后,同时开始执行。应该如何实现他们? 上面的CountDownLatch可以被用作倒数计数器,但是当计数完成后,只有一个线程的await()方法会得到反馈,所以多线程不能同时被触发启动。 为了实现多个线程等待其他线程的效果,我们可以使用CyclicBarrier数据结构,它的基本用法是:

  1. 首先创建一个公共的对象CyclicBarrier,然后是设置同时等待的线程数,CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
  2. 三个线程同时开始准备。等他们准备好后,他们需要等待其他线程准备完毕,所以调用cyclicBarrier.await()方法来等待其他线程
  3. 当这几个同时需要等待的线程都调用了cyclicBarrier.await()方法时,这意味着这些线程已经准备好,然后这些线程将同时开始指向后续的逻辑

实现代码如下:

public static void main(String[] args){
        int runner = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(runner);
        final Random random = new Random();
        for(char runnerName = 'A'; runnerName <= 'C'; runnerName ++ ){
            final String rN = String.valueOf(runnerName);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    long prepareTime = random.nextInt(10000) + 100;
                    System.out.println("运动员" + rN + "开始准备,需要的时间:" + prepareTime);
                    try{
                        Thread.sleep(prepareTime);
                    }catch (Exception e){
                        e.printStackTrace();
                    }

                    try {
                        System.out.println("运动员" + rN + "已经准备完成,等待其他选手");
                        cyclicBarrier.await();
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }catch (BrokenBarrierException e){
                        e.printStackTrace();
                    }

                    System.out.println("运动员" + rN + "开始出发");
                }
            }).start();
        }
    }

结果如下

运动员A开始准备,需要的时间:614
运动员B开始准备,需要的时间:7059
运动员C开始准备,需要的时间:8652
运动员A已经准备完成,等待其他选手
运动员B已经准备完成,等待其他选手
运动员C已经准备完成,等待其他选手
运动员C开始出发
运动员A开始出发
运动员B开始出发

子线程完成任务后,向主线程返回结果

在实际开发中,我们经常需要创建需要子线程来执行花时间的任务,然后将执行结果传回主线程中。我们在Java中怎么实现这个呢? 一般来说,当我们创建线程时,我们将传送Runnable对象来执行线程。 查看Runnable接口本身可知,里面的run()方法没有返回任何执行结果。 如果我们需要返回结果该怎么办呢?这里你可以使用另一个接口类Callable。 查看Callable接口发现,有一个最大的区别在于能返回值。 所以下一个问题是,如果将子线程的结果传回去? Java有一个类叫FutureTask,可以喝Callable一起使用,但需要注意的是用来获取结果的get方法将会阻塞主线程。 举个例子,我们想要子线程计算从1到100的和返回给主线程

    public static void main(String[] args){
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Task starts");
                Thread.sleep(1000);
                int result = 0;
                for (int i=0; i<=100; i++) {
                    result += i;
                }
                System.out.println("Task finished and return result");
                return result;
            }
        };
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        new Thread(futureTask).start();

        try {
            System.out.println("Before futureTask.get()");
            System.out.println("Result:" + futureTask.get());
            System.out.println("After futureTask.get()");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

返回结果是

Task starts
Before futureTask.get()
Task finished and return result
Result:5050
After futureTask.get()

这可以看出,当主线程调用了futureTask.get()方法时子线程阻塞了主线程;然后Callable执行完成后返回结果,然后futureTask.get()获取结果,然后主线程继续执行。 我们从这里能学到FutureTaskCallable可以用来在主线程中直接从子线程中获取结果,但是它们将会阻塞主线程。 当然,如果你不希望阻塞主线程,可以考虑使用ExecutorService,将FutureTask放入线程翅中来管理线程执行。


总结

在现代语言中,多线程是常用的功能,并且线程间通信,线程同步和线程安全是非常重要的话题。