概述

通过 wait()notifyAll()实现了生产者和消费者之间的通讯

还可以通过 Condition接口来实现,代码格式基本上是一致的


wait()/notify()Condition的两个区别:

  • 一个是操作系统实现的,一个是通过CAS实现的
  • 一个是通过wait和notify实现,那么对应的就只有一个等待队列,而通过Condition,可以创建两个队列,一个是生产者等待队列,一个是消费者等待队列,每次只需要唤醒对应的队列即可,无需一次性唤醒所有队列

区别在于: 通过Condition接口,可以实现让生产者在一个队列中等待,而消费者在另一个队列中等待,

wait/notify-简单Demo

public class TestProviderAndConsumer {
    //商品数量
    static int goodsSize=0;
    //锁
    static Object lock=new Object();

    public static void main(String[] args) {
        //启动生产者
        Provider provider = new Provider();
        new Thread(provider,"生产者A").start();
        new Thread(provider,"生产者B").start();
        new Thread(provider,"生产者C").start();

        //启动消费者
        Consumer consumer = new Consumer();
        new Thread(consumer,"消费者A").start();
        new Thread(consumer,"消费者B").start();
    }

    //生产者
    static class Provider implements Runnable{
        @Override
        public void run() {
            //模拟不断生产
            while(true){
                try {
                    //假设生产一个商品需要500毫秒
                    Thread.sleep(500);

                    synchronized (lock){
                        //如果数量满了,则停止生产,并通知消费者开始消费
                        if(goodsSize==10){
                            //唤醒其他所有线程(如果唤醒了生产者线程,首先需要得到lock锁,然后得到CPU时间片,那时如果商品数量还为10,那么线程还会被调用wait方法,从而继续等待)
                            lock.notifyAll();
                            System.out.println(Thread.currentThread().getName()+":数量已满,停止生产");
                            //线程等待,并释放锁
                            lock.wait();
                        }
                        //如果数量未满,则继续生产
                        else{
                            goodsSize++;
                            System.out.println(Thread.currentThread().getName()+"生产一个商品,目前数量为:"+goodsSize);
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //消费者
    static class Consumer implements Runnable{
        @Override
        public void run() {
            //模拟不断消费
            while(true){
                try {
                    //假设消费一个商品需要200毫秒
                    Thread.sleep(200);

                    synchronized (lock){
                        //如果数量为0,则停止消费,并通知生产者开始生产
                        if(goodsSize==0){
                            //唤醒其他所有线程(如果唤醒了消费者线程,首先需要得到lock锁,然后得到CPU时间片,那时如果商品数量还为0,那么线程还会被调用wait方法,从而继续等待)
                            lock.notifyAll();
                            System.out.println(Thread.currentThread().getName()+":数量为0,停止消费");
                            //线程等待,并释放锁
                            lock.wait();
                        }
                        //如果数量不为0,则继续消费
                        else{
                            goodsSize--;
                            System.out.println(Thread.currentThread().getName()+"消费一个商品,目前数量为:"+goodsSize);
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

输出结果片段:

生产者B生产一个商品,目前数量为:8
生产者C生产一个商品,目前数量为:9
生产者A生产一个商品,目前数量为:10
消费者B消费一个商品,目前数量为:9
消费者B消费一个商品,目前数量为:8
生产者B生产一个商品,目前数量为:9
生产者A生产一个商品,目前数量为:10
生产者C:数量已满,停止生产
消费者B消费一个商品,目前数量为:9
消费者A消费一个商品,目前数量为:8
消费者B消费一个商品,目前数量为:7
消费者A消费一个商品,目前数量为:6
消费者B消费一个商品,目前数量为:5
生产者B生产一个商品,目前数量为:6
生产者A生产一个商品,目前数量为:7
消费者A消费一个商品,目前数量为:6
消费者B消费一个商品,目前数量为:5
消费者A消费一个商品,目前数量为:4
消费者B消费一个商品,目前数量为:3
生产者B生产一个商品,目前数量为:4
生产者A生产一个商品,目前数量为:5
消费者A消费一个商品,目前数量为:4
消费者B消费一个商品,目前数量为:3
消费者A消费一个商品,目前数量为:2
消费者B消费一个商品,目前数量为:1
消费者A消费一个商品,目前数量为:0
消费者B:数量为0,停止消费
生产者A生产一个商品,目前数量为:1
生产者B生产一个商品,目前数量为:2
消费者A消费一个商品,目前数量为:1
消费者A消费一个商品,目前数量为:0
生产者C生产一个商品,目前数量为:1
生产者A生产一个商品,目前数量为:2

Condition-简单Demo

public class TestProviderAndConsumer2 {
    //商品数量
    static int goodsSize=0;
    //锁
    static ReentrantLock lock=new ReentrantLock();

    //生产者等待队列
    static Condition product_waitQueue=lock.newCondition();
    //消费者等待队列
    static Condition consumer_waitQueue=lock.newCondition();

    public static void main(String[] args) {
        //启动生产者
        Provider provider = new Provider();
        new Thread(provider,"生产者A").start();
        new Thread(provider,"生产者B").start();
        new Thread(provider,"生产者C").start();

        //启动消费者
        Consumer consumer = new Consumer();
        new Thread(consumer,"消费者A").start();
        new Thread(consumer,"消费者B").start();
    }

    //生产者
    static class Provider implements Runnable{
        @Override
        public void run() {
            //模拟不断生产
            while(true){
                lock.lock();
                try {
                    //假设生产一个商品需要500毫秒
                    Thread.sleep(500);

                    //如果数量满了,则停止生产,并通知消费者开始消费
                    if(goodsSize==10){
                        System.out.println(Thread.currentThread().getName()+":数量已满,停止生产");
                        //唤醒所有的消费者
                        consumer_waitQueue.signalAll();
                        //让生产者等待
                        product_waitQueue.await();
                    }
                    //如果数量未满,则继续生产
                    else{
                        goodsSize++;
                        System.out.println(Thread.currentThread().getName()+"生产一个商品,目前数量为:"+goodsSize);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }

            }
        }
    }

    //消费者
    static class Consumer implements Runnable{
        @Override
        public void run() {
            //模拟不断消费
            while(true){
                lock.lock();
                try {
                    //假设消费一个商品需要200毫秒
                    Thread.sleep(200);

                    //如果数量为0,则停止消费,并通知生产者开始生产
                    if(goodsSize==0){
                        System.out.println(Thread.currentThread().getName()+":数量为0,停止消费");
                        //唤醒所有的生产者
                        product_waitQueue.signalAll();
                        //让消费者等待
                        consumer_waitQueue.await();
                    }
                    //如果数量不为0,则继续消费
                    else{
                        goodsSize--;
                        System.out.println(Thread.currentThread().getName()+"消费一个商品,目前数量为:"+goodsSize);
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}
0条评论
头像
ICP证 : 浙ICP备18021271号