管程模型与等待通知机制

管程定义

Monitor,直译为监视器,操作系统领域意译为“管程”。管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发

Java 在 1.5 之前仅仅提供了 synchronized 关键字及 wait()、notify()、notifyAll()三个方法,他们并不是提供信号量这种编程原语。Java使用管程技术,这四者都是管程的组成部分,管程与信号量是等价的,可以互相实现。

管程如何实现互斥?

管程模型中,将共享变量和对共享变量的操作封装,访问共享变量必须通过管程提供的方法来实现,方法保证了互斥性,只允许一个线程进入管程,与面向对象思想契合。

管程如何实现同步?

  1. 管程模型是封装的,多个线程试图进入管程,只允许一个线程进入,其它线程在入口等待队列中等待。

  2. 管程里引入了条件变量的概念,每个条件变量都有一个条件变量等待队列

管程通过条件变量等待队列来实现同步。当线程T1进入管程后,发现不满足条件变量时,会进入条件变量等待队列,此时允许其它线程进入管程。当线程T2执行某操作,使条件变量满足,此时T2要通知T1,T1收到通知后从条件变量等待队列出来,重新进入入口等待队列等待。

其中等待过程通过wait()实现,如果用对象A表示某个条件,则线程T1需要调用A.wait()。同理条件满足时,T2通知T1过程,线程T2调用A.notify()来通知 A 等待队列中的一个线程,此时该队列只有T1一个线程。notifyAll()通知条件变量等待队列里的全部线程。

MESA模型编程范式:while循环调用wait()

MESA 管程特有的编程范式:需要在一个 while 循环里面调用wait()

while(条件不满足) {
  wait();
}

MESA 管程中,T2 通知完 T1 后,T2 会接着执行,T1 并不立即执行,仅仅是从条件变量的等待队列进到入口等待队列里面。这样做的好处是 notify() 不用放到代码的最后(如Hasen模型),T2 也没有多余的阻塞唤醒操作(如Hoare模型)。

但是也有个副作用,就是当 T1 再次执行的时候,可能曾经满足的条件,现在已经不满足了,所以需要以循环方式检验条件变量。

Java语言内置管程:synchronized

Java语言内置的管程(synchronized)对 MESA 模型进行了精简。MESA 模型条件变量可以有多个,synchronized 关键字修饰的代码块,在编译期会自动生成相关加锁和解锁的代码,但是仅支持一个条件变量。

面试题:为什么wait()方法是Object类实现的而不是Thread类?

这就需要联系管程模型分析,使用wait()方法的目的是实现线程的同步,他们是通过条件变量的等待队列来实现等待和通知,作用于条件变量上实现的同步而不是作用于线程上。

面试题:为什么wait()方法必须放在同步块中?

前提:wait()方法不在同步块中,代码会抛出IllegalMonitorStateExeception。

wait()方法放在同步块,是为了解决多线程下Lost wake-up问题,即无效唤醒,调用notify()的时候条件还未执行到wait()状态。

假设两个线程,一个生产者一个消费者,生产者伪代码:

count+1;
notify();

消费者伪代码:

while(count<=0)
   wait()
count--

若没在同步块中,存在这种情况:当初始值为0,消费者先检查count值,还未进入等待,此时发生了线程切换,生产者执行count+1和通知操作,而此时消费者还未进入等待状态为无效唤醒,丢弃了生产者的通知。

这里的问题就在于竞态条件,在消费者检查count值和wait()之间,count值可能被更改,所以需要让生产者和消费者竞争锁,拿到锁才能修改count值。同时注意必须处于锁对象的同步块。如obj.notify(),则需要处于synchronized(obj)同步块中。

Java版本手写生产者消费者模型:

    Integer count = 0;
    Object obj = new Object();

    private void produce() {
        while (true) {
            synchronized (obj) {
                while (count == 10) {
                    try {
                        obj.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                count++;
                System.out.println("生产后count = " + count);
                obj.notifyAll();
            }
        }
    }

    private void consume() {
        while (true) {
            synchronized (obj) {
                while (count <= 0) {
                    try {
                        obj.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                count--;
                System.out.println("消费后count = " + count);
                obj.notifyAll();
            }
        }
    }

    @Test
    public void test() throws InterruptedException {
        Thread t1 = new Thread(()->produce());
        Thread t2 = new Thread(()->consume());
        t1.start();
        t2.start();
        t1.join();
        t2.join();
    }

参考:

《Java并发编程实战》

https://www.jianshu.com/p/b8073a6ce1c0

Last updated

Was this helpful?