高并发中的线程通信

本文来自我没有三颗心脏的投稿

前序文章

一、经典的生产者消费者案例


上一篇文章我们提到一个应用可以创建多个线程去执行不同的任务,如果这些任务之间有着某种关系,那么线程之间必须能够通信来协调完成工作。

【万字长文】高并发中的线程通信,看这一篇就够了!生产者消费者问题(英语:Producer-consumer problem)就是典型的多线程同步案例,它也被称为有限缓冲问题(英语:Bounded-buffer problem)。

该问题描述了共享固定大小缓冲区[2]的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。

生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。

该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。(摘自维基百科:生产者消费者问题[3])

注意:生产者-消费者模式中的内存缓存区的主要功能是数据在多线程间的共享,此外,通过该缓冲区,可以缓解生产者和消费者的性能差;

准备基础代码:无通信的生产者消费者

我们来自己编写一个例子:一个生产者,一个消费者,并且让他们让他们使用同一个共享资源,并且我们期望的是生产者生产一条放到共享资源中,消费者就会对应地消费一条。

我们先来模拟一个简单的共享资源对象:

public class ShareResource {

    private String name;
    private String gender;

    /**
     * 模拟生产者向共享资源对象中存储数据
     *
     * @param name
     * @param gender
     */
    public void push(String name, String gender) {
        this.name = name;
        this.gender = gender;
    }

    /**
     * 模拟消费者从共享资源中取出数据
     */
    public void popup() {
        System.out.println(this.name + "-" + this.gender);
    }
}

然后来编写我们的生产者,使用循环来交替地向共享资源中添加不同的数据:

public class Producer implements Runnable {

    private ShareResource shareResource;

    public Producer(ShareResource shareResource) {
        this.shareResource = shareResource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            if (i % 2 == 0) {
                shareResource.push("凤姐", "女");
            } else {
                shareResource.push("张三", "男");
            }
        }
    }
}

接着让我们的消费者不停地消费生产者产生的数据:

public class Consumer implements Runnable {

    private ShareResource shareResource;

    public Consumer(ShareResource shareResource) {
        this.shareResource = shareResource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            shareResource.popup();
        }
    }
}

然后我们写一段测试代码,来看看效果:

public static void main(String[] args) {
    // 创建生产者和消费者的共享资源对象
    ShareResource shareResource = new ShareResource();
    // 启动生产者线程
    new Thread(new Producer(shareResource)).start();
    // 启动消费者线程
    new Thread(new Consumer(shareResource)).start();
}

我们运行发现出现了诡异的现象,所有的生产者都似乎消费到了同一条数据:

张三-男
张三-男
....以下全是张三-男....

为什么会出现这样的情况呢?照理说,我的生产者在交替地向共享资源中生产数据,消费者也应该交替消费才对呀..

我们大胆猜测一下,会不会是因为消费者是直接循环了 30 次打印共享资源中的数据,而此时生产者还没有来得及更新共享资源中的数据,消费者就已经连续打印了 30 次了,所以我们让消费者消费的时候以及生产者生产的时候都小睡个 10 ms 来缓解消费太快 or 生产太快带来的影响,也让现象更明显一些:

/**
 * 模拟生产者向共享资源对象中存储数据
 *
 * @param name
 * @param gender
 */
public void push(String name, String gender) {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    this.name = name;
    this.gender = gender;
}

/**
 * 模拟消费者从共享资源中取出数据
 */
public void popup() {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    System.out.println(this.name + "-" + this.gender);
}

再次运行代码,发现了出现了以下的几种情况:

  • 重复消费:消费者连续地出现两次相同的消费情况(张三-男/ 张三-男);
  • 性别紊乱:消费者消费到了脏数据(张三-女/ 凤姐-男);

分析出现问题的原因

  • 重复消费:我们先来看看重复消费的问题,当生产者生产出一条数据的时候,消费者正确地消费了一条,但是当消费者再来共享资源中消费的时候,生产者还没有准备好新的一条数据,所以消费者就又消费到老数据了,这其中的根本原因是生产者和消费者的速率不一致
  • 性别紊乱:再来分析第二种情况。不同于上面的情况,消费者在消费第二条数据时,生产者也正在生产新的数据,但是尴尬的是,生产者只生产了一半儿(也就是该执行完 <span style="font-size:">this.name = name</span>),也就是还没有来得及给 <span style="font-size:">gender</span> 赋值就被消费者给取走消费了.. 造成这样情况的根本原因是没有保证生产者生产数据的原子性

解决出现的问题

加锁解决性别紊乱

我们先来解决性别紊乱,也就是原子性的问题吧,上一篇文章里我们也提到了,对于这样的原子性操作,解决方法也很简单:加锁

稍微改造一下就好了:

/**
 * 模拟生产者向共享资源对象中存储数据
 *
 * @param name
 * @param gender
 */
synchronized public void push(String name, String gender) {
    this.name = name;
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    this.gender = gender;
}

/**
 * 模拟消费者从共享资源中取出数据
 */
synchronized public void popup() {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    System.out.println(this.name + "-" + this.gender);
}
  • 我们在方法前面都加上了 <span style="font-size:">synchronized</span> 关键字,来保证每一次读取和修改都只能是一个线程,这是因为当 <span style="font-size:">synchronized</span> 修饰在普通同步方法上时,它会自动锁住当前实例对象,也就是说这样改造之后读/ 写操作同时只能进行其一;
  • 我把 <span style="font-size:">push</span> 方法小睡的代码改在了赋值 <span style="font-size:">name</span><span style="font-size:">gender</span> 的中间,以强化验证原子性操作是否成功,因为如果不是原子性的话,就很可能出现赋值 <span style="font-size:">name</span> 还没赋值给 <span style="font-size:">gender</span>就被取走的情况,小睡一会儿是为了加强这种情况的出现概率(可以试着把 <span style="font-size:">synchronized</span> 去掉看看效果);

运行代码后发现,并没有出现性别紊乱的现象了,但是重复消费仍然存在。

等待唤醒机制解决重复消费

我们期望的是 <span style="font-size:">张三-男</span><span style="font-size:">凤姐-女</span> 交替出现,而不是有重复消费的情况,所以我们的生产者和消费者之间需要一点沟通

最容易想到的解决方法是,我们新增加一个标志位,然后在消费者中使用 <span style="font-size:">while</span> 循环判断,不满足条件则不消费,条件满足则退出 <span style="font-size:">while</span> 循环,从而完成消费者的工作。

while (value != desire) {
    Thread.sleep(10);
}
doSomething();

这样做的目的就是为了防止「过快的无效尝试」,这种方法看似能够实现所需的功能,但是却存在如下的问题:

  • 1)难以确保及时性。在睡眠时,基本不消耗处理器的资源,但是如果睡得过久,就不能及时发现条件已经变化,也就是及时性难以保证;
  • 2)难以降低开销。如果降低睡眠的时间,比如休眠 1 毫秒,这样消费者能够更加迅速地发现条件变化,但是却可能消耗更多的处理资源,造成了无端的浪费。

以上两个问题吗,看似矛盾难以调和,但是 Java 通过内置的等待/ 通知机制能够很好地解决这个矛盾并实现所需的功能。

等待/ 通知机制,是指一个线程 A 调用了对象 O 的 <span style="font-size:">wait()</span> 方法进入等待状态,而另一个线程 B 调用了对象 O 的 <span style="font-size:">notifyAll()</span> 方法,线程 A 收到通知后从对象 O 的 <span style="font-size:">wait()</span> 方法返回,进而执行后续操作。

上述两个线程都是通过对象 O 来完成交互的,而对象上的 <span style="font-size:">wait</span><span style="font-size:">notify/ notifyAll</span> 的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

这里有一个比较奇怪的点是,为什么看起来像是线程之间操作的 <span style="font-size:">wait</span><span style="font-size:">notify/ notifyAll</span>方法会是 <span style="font-size:">Object</span> 类中的方法,而不是 <span style="font-size:">Thread</span> 类中的方法呢?

  • 简单来说:因为 <span style="font-size:">synchronized</span> 中的这把锁可以是任意对象,因为要满足任意对象都能够调用,所以属于 <span style="font-size:">Object</span> 类;
  • 专业点说:因为这些方法在操作同步线程时,都必须要标识它们操作线程的锁,只有同一个锁上的被等待线程,可以被同一个锁上的 <span style="font-size:">notify</span> 唤醒,不可以对不同锁中的线程进行唤醒。也就是说,等待和唤醒必须是同一个锁。而锁可以是任意对象,所以可以被任意对象调用的方法是定义在 <span style="font-size:">Object</span> 类中。

好,简单介绍完等待/ 通知机制,我们开始改造吧:

public class ShareResource {

    private String name;
    private String gender;
    // 新增加一个标志位,表示共享资源是否为空,默认为 true
    private boolean isEmpty = true;

    /**
     * 模拟生产者向共享资源对象中存储数据
     *
     * @param name
     * @param gender
     */
    synchronized public void push(String name, String gender) {
        try {
            while (!isEmpty) {
                // 当前共享资源不为空的时,则等待消费者来消费
                // 使用同步锁对象来调用,表示当前线程释放同步锁,进入等待池,只能被其他线程所唤醒
                this.wait();
            }
            // 开始生产
            this.name = name;
            Thread.sleep(10);
            this.gender = gender;
            // 生产结束
            isEmpty = false;
            // 生产结束唤醒一个消费者来消费
            this.notify();
        } catch (Exception ignored) {
        }
    }

    /**
     * 模拟消费者从共享资源中取出数据
     */
    synchronized public void popup() {
        try {
            while (isEmpty) {
                // 为空则等着生产者进行生产
                // 使用同步锁对象来调用,表示当前线程释放同步锁,进入等待池,只能被其他线程所唤醒
                this.wait();
            }
            // 消费开始
            Thread.sleep(10);
            System.out.println(this.name + "-" + this.gender);
            // 消费结束
            isEmpty = true;
            // 消费结束唤醒一个生产者去生产
            this.notify();
        } catch (InterruptedException ignored) {
        }
    }
}
  • 我们期望生产者生产一条,然后就去通知消费者消费一条,那么在生产和消费之前,都需要考虑当前是否需要生产 or 消费,所以我们新增了一个标志位来判断,如果不满足则等待;
  • 被通知后仍然要检查条件,条件满足,则执行我们相应的生产 or 消费的逻辑,然后改变条件(这里是 <span style="font-size:">isEmpty</span>),并且通知所有等待在对象上的线程;
  • 注意:上面的代码中通知使用的 <span style="font-size:">notify()</span> 方法,这是因为例子中写死了只有一个消费者和生产者,在实际情况中建议还是使用 <span style="font-size:">notifyAll()</span> 方法,这样多个消费和生产者逻辑也能够保证(可以自己试一下);

小结

通过初始版本一步步地分析问题和解决问题,我们就差不多写出了我们经典生产者消费者的经典代码,但通常消费和生产的逻辑是写在各自的消费者和生产者代码里的

这里我为了方便阅读,把他们都抽离到了共享资源上,我们可以简单地再来回顾一下这个消费生产和等待通知的整个过程:

【万字长文】高并发中的线程通信,看这一篇就够了!以上就是关于生产者生产一条数据,消费者消费一次的过程了,涉及的一些具体细节我们下面来说。

二、线程间的通信方式


等待唤醒机制的替代:Lock 和 Condition

我们从上面的中看到了 <span style="font-size:">wait()</span><span style="font-size:">notify()</span> 方法,只能被同步监听锁对象来调用,否则就会报出 <span style="font-size:">IllegalMonitorZStateException</span> 的异常,那么现在问题来了,我们在上一篇提到的 <span style="font-size:">Lock</span> 机制根本就没有同步锁了,也就是没有自动获取锁和自动释放锁的概念,因为没有同步锁,也就意味着 <span style="font-size:">Lock</span> 机制不能调用 <span style="font-size:">wait</span><span style="font-size:">notify</span> 方法,我们怎么办呢?

好在 Java 5 中提供了 Lock 机制的同时也提供了用于 Lock 机制控制通信的 Condition 接口,如果大家理解了上面说到的 <span style="font-size:">Object.wait()</span><span style="font-size:">Object.notify()</span> 方法的话,那么就能很容易地理解 Condition 对象了。

它和 <span style="font-size:">wait()</span><span style="font-size:">notify()</span> 方法的作用是大致相同的,只不过后者是配合 <span style="font-size:">synchronized</span> 关键字使用的,而 Condition 是与重入锁相关联的。通过 Lock 接口(重入锁就实现了这一接口)的 <span style="font-size:">newCondition()</span> 方法可以生成一个与当前重入锁绑定的 Condition 实例。利用 Condition 对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。

我们拿上面的生产者消费者来举例,修改成 Lock 和 Condition 代码如下:

public class ShareResource {

    private String name;
    private String gender;
    // 新增加一个标志位,表示共享资源是否为空,默认为 true
    private boolean isEmpty = true;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    /**
     * 模拟生产者向共享资源对象中存储数据
     *
     * @param name
     * @param gender
     */
    public void push(String name, String gender) {
        lock.lock();
        try {
            while (!isEmpty) {
                // 当前共享资源不为空的时,则等待消费者来消费
                condition.await();
            }
            // 开始生产
            this.name = name;
            Thread.sleep(10);
            this.gender = gender;
            // 生产结束
            isEmpty = false;
            // 生产结束唤醒消费者来消费
            condition.signalAll();
        } catch (Exception ignored) {
        } finally {
            lock.unlock();
        }
    }

    /**
     * 模拟消费者从共享资源中取出数据
     */
    public void popup() {
        lock.lock();
        try {
            while (isEmpty) {
                // 为空则等着生产者进行生产
                condition.await();
            }
            // 消费开始
            Thread.sleep(10);
            System.out.println(this.name + "-" + this.gender);
            // 消费结束
            isEmpty = true;
            // 消费结束唤醒生产者去生产
            condition.signalAll();
        } catch (InterruptedException ignored) {
        } finally {
            lock.unlock();
        }
    }
}

在 JDK 内部,重入锁和 Condition 对象被广泛地使用,以 ArrayBlockingQueue 为例,它的 <span style="font-size:">put()</span> 方法实现如下:

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

// 构造函数,初始化锁以及对应的 Condition 对象
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 等待队列有足够的空间
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 通知需要 take() 的线程,队列已有数据
    notEmpty.signal();
}

同理,对应的 <span style="font-size:">take()</span> 方法实现如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 如果队列为空,则消费者队列要等待一个非空的信号
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

允许多个线程同时访问:信号量(Semaphore)

以下内容摘录 or 改编自 《实战 Java 高并发程序设计》 3.1.3 节的内容

信号量为多线程协作提供了更为强大的控制方法。广义上说,信号量是对锁的扩展,无论是内部锁 synchronized 还是重入锁 ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。信号量主要提供了以下构造函数:

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)        // 第二个参数可以指定是否公平

在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。信号量的主要逻辑如下:

public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()
  • <span style="font-size:">acquire()</span> 方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。
  • <span style="font-size:">acquireUninterruptibly()</span> 方法和 <span style="font-size:">acquire()</span> 方法类似,但是不响应中断。
  • <span style="font-size:">tryAcquire()</span> 尝试获得一个许可,如果成功则返回 true,失败则返回 false,它不会进行等待,立即返回。
  • <span style="font-size:">release()</span> 用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。

在 JDK 的官方 Javadoc 中,就有一个有关信号量使用的简单实例,有兴趣的读者可以自行去翻阅一下,这里给出一个更傻瓜化的例子:

public class SemapDemo implements Runnable {

    final Semaphore semaphore = new Semaphore(5);

    @Override
    public void run() {
        try {
            semaphore.acquire();
            // 模拟耗时操作
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
            semaphore.release();
        } catch (InterruptedException ignore) {
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            executorService.submit(demo);
        }
    }
}

执行程序,就会发现系统以 5 个线程为单位,依次输出带有线程 ID 的提示文本。

在实现上,Semaphore 借助了线程同步框架 AQS(AbstractQueuedSynchornizer),同样借助了 AQS 来实现的是 Java 中可重入锁的实现。

AQS 的强大之处在于,你仅仅需要继承它,然后使用它提供的 api 就可以实现任意复杂的线程同步方案,AQS 为我们做了大部分的同步工作,所以这里不细说,之后再来详细探究一下…

我等着你:Thread.join()

如果一个线程 A 执行了 <span style="font-size:">thread.join()</span> 方法,其含义是:当前线程 A 等待 thread 线程终止之后才从 thread.join() 返回。线程 Thread 除了提供 <span style="font-size:">join()</span> 方法之外,还提供了 <span style="font-size:">join(long millis)</span><span style="font-size:">join(long millis, int nanos)</span> 两个具备超时特性的方法。这两个超时方法表示,如果线程 Thread 在给定的超时时间里没有终止,那么将会从该超时方法中返回。

在下面的代码中,我们创建了 10 个线程,编号 0 ~ 9,每个线程调用前一个线程的 <span style="font-size:">join()</span>方法,也就是线程 0 结束了,线程 1 才能从 <span style="font-size:">join()</span> 方法中返回,而线程 0 需要等待 main 线程结束。

public class Join {

    public static void main(String[] args) throws InterruptedException {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
            // 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate. ");
    }

    static class Domino implements Runnable {

        private Thread thread;

        public Domino(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException ignore) {
            }
            System.out.println(Thread.currentThread().getName() + " terminate. ");
        }
    }
}

运行程序,可以看到下列输出:

main terminate.
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.

说明每个线程终止的前提都是前驱线程的终止,每个线程等待前驱线程结束后,才从 <span style="font-size:">join()</span>方法中返回,这里涉及了等待/ 通知机制,在 JDK 的源码中,我们可以看到 <span style="font-size:">join()</span> 的方法如下:

public final synchronized void join(long millis)
    throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        // 条件不满足则继续等待
        while (isAlive()) {
            wait(0);
        }
        // 条件符合则返回
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

当线程终止时,会调用线程自身的 notifyAll() 方法,会通知所有等待在该线程对象上的线程。可以看到 <span style="font-size:">join()</span> 方法的逻辑结构跟我们上面写的生产者消费者类似,即加锁、循环和处理逻辑三个步骤。

三、线程之间的数据交互


保证可见性:volatile 关键字

我们先从一个有趣的例子入手:

private static boolean isOver = false;

public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(() -> {
        while (!isOver) {
        }
        System.out.println("线程已感知到 isOver 置为 true,线程正常返回!");
    });
    thread.start();
    Thread.sleep(500);
    isOver = true;
    System.out.println("isOver 已置为 true");
}

我们开启了一个主线程和一个子线程,我们期望子线程能够感知到 <span style="font-size:">isOver</span> 变量的变化以结束掉死循环正常返回,但是运行程序却发现并不是像我们期望的那样发生,子线程一直处在了死循环的状态!

为什么会这样呢?

Java 内存模型

关于这一点,我们有几点需要说明,首先需要搞懂 Java 的内存模型:

【万字长文】高并发中的线程通信,看这一篇就够了!Java 虚拟机规范中试图定义一种 Java 内存模型(Java Memory Model, JMM)来屏蔽掉各层硬件和操作系统的内存访问差异,以实现让 Java 程序在各种平台下都能达到一致的内存访问效果。

Java 内存模型规定了所有的变量都存储在主内存(Main Memory)中。每条线程还有自己的工作内存(Working Memory),线程的工作内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在主内存中进行,而不能直接读写主内存中的变量。不同的线程之间也无法直接访问对方工作内存中的变量,线程间的变量值的传递均需要通过主内存来完成,线程、主内存、工作内存三者的关系如上图。

那么不同的线程之间是如何通信的呢?

共享内存的并发模型里,线程之间共享程序的公共状态,线程之间通过写-读内存中的公共状态来隐式进行通信,典型的共享内存通信方式就是通过共享对象进行通信。

【万字长文】高并发中的线程通信,看这一篇就够了!例如上图线程 A 与 线程 B 之间如果要通信的话,那么就必须经历下面两个步骤:

  1. 首先,线程 A 把本地内存 A 更新过的共享变量刷新到主内存中去
  2. 然后,线程 B 到主内存中去读取线程 A 之前更新过的共享变量

【万字长文】高并发中的线程通信,看这一篇就够了!在消息传递的并发模型里,线程之间没有公共状态,线程之间必须通过明确的发送消息来显式进行通信,在 Java 中典型的消息传递方式就是 <span style="font-size:">wait()</span><span style="font-size:">notify()</span>

说回刚才出现的问题,就很容易理解了:每个线程都有独占的内存区域,如操作栈、本地变量表等。

线程本地保存了引用变量在堆内存中的副本,线程对变量的所有操作都在本地内存区域中进行,执行结束后再同步到堆内存中去

也就是说,我们在主线程中修改的 <span style="font-size:">isOver</span> 的值并没有被子线程读取到(没有被刷入主内存),也就造成了子线程对于 <span style="font-size:">isOver</span> 变量不可见。

解决方法也很简单,只需要在 <span style="font-size:">isOver</span> 变量前加入 <span style="font-size:">volatile</span> 关键字就可以了,这是因为加入了 volatile 修饰的变量允许直接与主内存交互,进行读写操作,保证可见性。

指令重排/ happen-before 原则

再从另一个有趣的例子中入手,这是在高并发场景下会存在的问题:

class LazyInitDemo {
    private static TransationService service = null;

    public static TransationService getTransationService(){
        if (service == null) {
            synchronized (this) {
                if (service == null) {
                    service = new TransationService();
                }
            }
        }
    }
}

这是一个典型的双重检查锁定思想,这段代码也是一个典型的双重检查锁定(Double-checked Locking)问题。在高并发的情况下,该对象引用在没有同步的情况下进行读写操作,导致用户可能会获取未构造完成的对象

这是因为指令优化的结果。计算机不会根据代码顺序按部就班地执行相关指令,我们来举一个借书的例子:假如你要去还书并且想要借一个《高并发编程学习》系列丛书,而你的室友恰好也要还书,并且还想让你帮忙借一本《Java 从入门到放弃》。

这件事乍一看有两件事:你的事和你室友的事。先办完你的事,再开始处理你室友的事情是属于单线程的死板行为,此时你会潜意识地进行「优化」,例如你可以把你要还的书和你室友需要还的书一起还了,再一起把想要借的书借出来,这其实就相当于合并数据进行存取的操作过程了。

我们知道一条指令的执行是可以分成很多步骤的,简单地说,可以分为:

  • 取值 IF
  • 译码和去寄存器操作数 ID
  • 执行或者有效地址计算 EX
  • 存储器访问 MEM
  • 写回 WB

由于每一个步骤可能使用不同的硬件完成,因此,聪明的工程师就发明了流水线技术来执行指令,如下图所示:

【万字长文】高并发中的线程通信,看这一篇就够了!可以看到,当第 2 条指令执行时,第 1 条执行其实并没有执行完,确切地说第一条指令还没有开始执行,只是刚刚完成了取值操作而已。

这样的好处非常明显,假如这里每一个步骤都需要花费 1 毫秒,那么指令 2 等待指令 1 完全执行后再执行,则需要等待 5 毫秒,而使用流水线指令,指令 2 只需要等待 1 毫秒就可以执行了。如此大的性能提升,当然让人眼红。

回到最初的问题,我们分析一下:对于 Java 编译器来说,初始化 TransactionService 实例和将对象地址写到 service 字段并非原子操作,且这两个阶段的执行顺序是未定义的

加入某个线程执行 <span style="font-size:">new TransactionService()</span> 时,构造方法还未被调用,编译器仅仅为该对象分配了内存空间并设为默认值,此时若另一个线程调用 <span style="font-size:">getTransactionService()</span> 方法,由于 <span style="font-size:">service != null</span>,但是此时 <span style="font-size:">service</span> 对象还没有被赋予真正的有效值,从而无法取到正确的 <span style="font-size:">service</span> 单例对象。

对于此问题,一种较为简单的解决方案就是用 <span style="font-size:">volatile</span> 关键字修饰目标属性(适用于 JDK5 及以上版本),这样 <span style="font-size:">service</span> 就限制了编译器对它的相关读写操作,对它的读写操作进行指令重排,确定对象实例化之后才返回引用。

另外指令重排也有自己的规则,并非所有的指令都可以随意改变执行位置,下面列举一下基本的原则:

  • 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作;
  • 锁定规则:一个 unLock 操作先行发生于后面对同一个锁的 lock 操作;
  • volatile 变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;
  • 传递规则:如果操作 A 先行发生于操作 B,而操作 B 又先行发生于操作 C,则可以得出操作 A 先行发生于操作 C;
  • 线程启动规则:Thread 对象的 <span style="font-size:">start()</span> 方法先行发生于此线程的每个一个动作;
  • 线程中断规则:对线程 <span style="font-size:">interrupt()</span> 方法的调用先行发生于被中断线程的代码检测到中断事件的发生;
  • 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过 <span style="font-size:">Thread.join()</span> 方法结束、<span style="font-size:">Thread.isAlive()</span> 的返回值手段检测到线程已经终止执行;
  • 对象终结规则:一个对象的初始化完成先行发生于他的 <span style="font-size:">finalize()</span> 方法的开始;

volatile 不保证原子性

volatile 解决的是多线程共享变量的可见性问题,类似于 synchronized,但不具备 synchronized 的互斥性。所以对 volatile 变量的操作并非都具有原子性,例如我们用下面的例子来说明:

public class VolatileNotAtomic {

    private static volatile long count = 0L;
    private static final int NUMBER = 10000;

    public static void main(String[] args) {
        Thread subtractThread = new SubstractThread();
        subtractThread.start();

        for (int i = 0; i < NUMBER; i++) {
            count++;
        }

        // 等待减法线程结束
        while (subtractThread.isAlive()) {
        }

        System.out.println("count 最后的值为: " + count);
    }

    private static class SubstractThread extends Thread {

        @Override
        public void run() {
            for (int i = 0; i < NUMBER; i++) {
                count--;
            }
        }
    }
}

多次执行后,发现结果基本都不为 0。只有在 <span style="font-size:">count++</span><span style="font-size:">count--</span> 两处都进行加锁时,才能正确的返回 0,了解 Java 的童鞋都应该知道这 <span style="font-size:">count++</span><span style="font-size:">count--</span> 都不是一个原子操作,这里就不作说明了。

volatile 的使用优化

在了解一点吧,著名的并发编程大师 Doug lea 在 JDK 7 的并发包里新增一个队列集合类 LinkedTransferQueue,它在使用 volatile 变量时,用一种追加字节的方式来优化对列出队和入队的性能,具体的可以看一下下列的链接,这里就不具体说明了。

保证原子性:synchronized

Java 中任何一个对象都有一个唯一与之关联的锁,这样的锁作为该对象的一系列标志位存储在对象信息的头部。Java 对象头里的 Mark Word 里默认的存放的对象的 Hashcode/ 分代年龄和锁标记位。32 为 JVM Mark Word 默认存储结构如下:

【万字长文】高并发中的线程通信,看这一篇就够了!Java SE 1.6 中,锁一共有 4 种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这几个状态会随着竞争情况逐渐升级。锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能降级成偏向锁。这种锁升级却不能降级的策略,目的是为了提高获得锁和释放锁的效率。

偏向锁

HotSpot 的作者经过研究发现,大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁。

  • 偏向锁的获取:当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程 ID,以后该线程在进入和退出同步块时不需要进行 CAS 操作来加锁和解锁,只需简单地测试一下对象头的 Mark Word 里是否存储着指向当前线程的偏向锁。

    如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下 Mark Word 中偏向锁的标识是否设置成 1(表示当前是偏向锁),如果没有设置,则使用 CAS 竞争锁;如果设置了,则尝试使用 CAS 将对象头的偏向锁指向当前线程。

  • 偏向锁的撤销:偏向锁使用了一种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁。

下图线程 1 展示了偏向锁获取的过程,线程 2 展示了偏向锁撤销的过程。

【万字长文】高并发中的线程通信,看这一篇就够了!### 轻量级锁和自旋锁

如果偏向锁失败,虚拟机并不会立即挂起线程。它还会使用一种称为轻量级锁的优化手段。

线程在执行同步块之前,JVM 会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的 Mark Word 复制到锁记录中,官方称为 Displaced Mark Word。然后线程尝试使用 CAS 将对象头中的 Mark Word 替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋(自己执行几个空循环再进行尝试)来获取锁。

轻量级解锁时,会使用原子的 CAS 操作将 Displaced Mark Word 替换回到对象头,如果成功,则表示没有竞争发生。如果失败,表示当前锁存在竞争,锁就会膨胀成重量级锁。下图是两个线程同时争夺锁,导致锁膨胀的流程图。

【万字长文】高并发中的线程通信,看这一篇就够了!### 几种锁的比较

下图就简单概括了一下几种锁的比较:

【万字长文】高并发中的线程通信,看这一篇就够了!## 每人一支笔:ThreadLocal

除了控制资源的访问外,我们还可以通过增加资源来保证所有对象的线程安全。比如,让 100 个人填写个人信息表,如果只有一支笔,那么大家就得挨个写,对于管理人员来说,必须保证大家不会去哄抢这仅存的一支笔,否则,谁也填不完。从另外一个角度出发,我们可以干脆就准备 100 支笔,那么所有人都可以各自为营,很快就能完成表格的填写工作。

如果说锁是使用第一种思路,那么 ThreadLocal 就是使用第二种思路了。

当使用 ThreadLocal 维护变量时,其为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立的改变自己的副本,而不会影响其他线程对应的副本。

ThreadLocal 内部实现机制

【万字长文】高并发中的线程通信,看这一篇就够了!1. 每个线程内部都会维护一个类似 HashMap 的对象,称为 ThreadLocalMap,里边会包含若干了 Entry(K-V 键值对),相应的线程被称为这些 Entry 的属主线程;

  1. Entry 的 Key 是一个 ThreadLocal 实例,Value 是一个线程特有对象。Entry 的作用即是:为其属主线程建立起一个 ThreadLocal 实例与一个线程特有对象之间的对应关系;
  2. Entry 对 Key 的引用是弱引用;Entry 对 Value 的引用是强引用。

ThreadLodal 的副作用

为了让线程安全地共享某个变量,JDK 开出了 ThreadLocal 这副药方,但「是药三分毒」,ThreadLocal 也有一定的副作用。主要问题是「产生脏数据」和「内存泄漏」。这两个问题通常是在线程池中使用 ThreadLocal 引发的,因为线程池有 「线程复用」「内存常驻」两个特点。

脏数据

线程复用会产生脏数据。由于线程池会重用 Thread 对象,那么与 Thread 绑定的类的静态属性 ThreadLocal 变量也会被重用。如果在实现的线程 <span style="font-size:">run()</span> 方法中不显式地 <span style="font-size:">remove()</span>清理与线程相关的 ThreadLocal 信息,那么倘若下一个线程不调用 <span style="font-size:">set()</span> 设置初始值,就可能 <span style="font-size:">get()</span> 到重用的线程信息,包括 ThreadLocal 所关联的线程对象的 value 值。

为了方便理解,用一段简要代码来模拟,如下所示:

public class DirtyDataInThreadLocal {

    public static ThreadLocalthreadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        // 使用固定大小为 1 的线程池,说明上一个的线程属性会被下一个线程属性复用
        ExecutorService pool = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 2; i++) {
            Mythread mythread = new Mythread();
            pool.execute(mythread);
        }
    }

    private static class Mythread extends Thread {

        private static boolean flag = true;

        @Override
        public void run() {
            if (flag) {
                // 第 1 个线程 set 后,并没有进行 remove
                // 而第二个线程由于某种原因没有进行 set 操作
                threadLocal.set(this.getName() + ", session info.");
                flag = false;
            }
            System.out.println(this.getName() + " 线程是 " + threadLocal.get());
        }
    }
}

执行结果:

Thread-0 线程是 Thread-0, session info.
Thread-1 线程是 Thread-0, session info.

内存泄漏

在源码注释中提示使用 static 关键字来修饰 ThreadLocal。在此场景下,寄希望于 ThreadLocal 对象失去引用后,触发弱引用机制来回首 Entry 的 Value 就变得不现实了。在上面的例子中,如果不进行 <span style="font-size:">remove()</span> 操作,那么这个线程执行完成后,通过 ThreadLocal 对象持有的 String 对象是不会被释放的。

以上两个问题的解决办法很简单,就是在每次使用完 ThreadLocal 时,必须要及时调用 <span style="font-size:">remove()</span> 方法清理。

参考资料


  1. 《Java 零基础入门教程》 – http://study.163.com/course/courseMain.htm?courseId=1003108028[5]
  2. 《Java 并发编程的艺术》
  3. 《码出高效 Java 开发手册》 – 杨冠宝(孤尽) 高海慧(鸣莎)著
  4. Java 面试知识点解析(二)——高并发编程篇 – https://www.wmyskxz.com/2018/05/10/java-mian-shi-zhi-shi-dian-jie-xi-er-gao-bing-fa-bian-cheng-pian/[6]
  5. 让你彻底理解 Synchronized – https://www.jianshu.com/p/d53bf830fa09[7]
  6. 《Offer 来了 – Java 面试核心知识点精讲》 – 王磊 编著
  7. 《实战 Java 高并发程序设计》 – 葛一鸣 郭超 编著

END

1. 本站所有资源来源于用户上传和网络,如有侵权请及时联系删除,本站不承担任何法律责任!
2. 分享目的仅供大家学习和研究,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的教程、源码等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,默认解压密码为"www.94zyw.com",如遇到无法解压的请联系管理员!
94资源网 » 高并发中的线程通信