面试怎么说—阻塞队列

什么是阻塞队列

阻塞队列其实就是在队列基础上增加了阻塞操作。简单来说,就是在队列为空的时候,从队头取数据会被阻塞。因为此时还没有数据可取,直到队列中有了数据才能返回;如果队列已经满了,那么插入数据的操作就会被阻塞,直到队列中有空闲位置后再插入数据,然后再返回。

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒
为什么需要BlockingQueue,好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。

img

从上述的定义不难发现,这实际上就是一个“生产者 - 消费者模型”,是的,我们可以使用阻塞队列,轻松实现一个“生产者 - 消费者模型”。

阻塞队列的API

image-20191107104914903

常用API

  1. ArrayBlockingQueue:由数组结构组成的有界阻塞队列

  2. LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列。

  3. PriorityBlockingQueue:支持优先级排序的无界阻塞队列。

  4. DelayQueue:使用优先级队列实现的延迟无界阻塞队列。

  5. SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列

    SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

  6. LinkedTransferQueue:由链表结构组成的无界阻塞队列。

  7. LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

阻塞方法

  1. put:把对象加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续
  2. take:如果取到则返回,如果没有取到,则一直阻塞,直到队列不为空

抛出异常

  1. Add:如果队列的容量已满,在添加元素就会抛出Queue full异常
  2. Remove: 如果队列没有元素,则抛出NoSuchElement异常;如果是删除某一个元素,如果元素不存在,则不会抛出异常,而是返回false.
  3. Element: 如果队列为空,则抛出NoSuchElement异常;如果队不为空,则返回第一个元素。

返回特殊值

  1. offer(anObject): 如果可能的话,将元素添加到队列,返回true,如果队列已满,则不添加,返回false.不阻塞执行线程
  2. poll: 从队列返回第一个元素,如果没有返回null
  3. peek: 从队列返回第一个元素,如果没有返回null

超时退出

  1. offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列。加入BlockingQueue,则返回失败。
  2. poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

阻塞队列的应用

实现生产者消费者

传统方式实现生产者消费者

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareData{  //资源类
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception{
        lock.lock();
        try{
            //        1.判断
            while(number !=0){
//            等待,不能生产
                condition.await();
            }
            //2.干活
            number++;
            System.out.println(Thread.currentThread().getName()+"\t"+number);
            //3.通知唤醒
            condition.signalAll();
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

    public void decrement() throws Exception{
        lock.lock();
        try{
            //        1.判断
            while(number ==0){
//            等待,不能生产
                condition.await();
            }
            //2.干活
            number--;
            System.out.println(Thread.currentThread().getName()+"\t"+number);
            //3.通知唤醒
            condition.signalAll();
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

}

/*
* 题目:一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮
*
* 1.线程操作资源类
* 2.判断 干活 通知
* 3.防止虚假唤醒机制
* */
public class ProdConsumer_TraditionDemo {
    public static void main(String[] args){
        ShareData shareData = new ShareData();

        new Thread(()->{
            for(int i=1;i<=5;i++){
                try {
                    shareData.increment();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        },"AAA").start();

        new Thread(()->{
            for(int i=1;i<=5;i++){
                try {
                    shareData.decrement();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        },"BBB").start();
    }
}

阻塞队列实现生产者消费者

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource{
    private volatile boolean FLAG = true;//默认开启,进行生产+消费
    private AtomicInteger atomicInteger = new AtomicInteger();

    BlockingQueue<String> blockingQueue = null;
    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }

    public void myProd() throws Exception{
        String data = null;
        boolean retValue;
        while(FLAG){
            data = atomicInteger.incrementAndGet()+"";
            retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS);
            if(retValue){
                System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
            }else{
                System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+"\t生产停止");
    }

    public void myConsumer() throws Exception{
        String result = null;
        while(FLAG){
            result = blockingQueue.poll(2L,TimeUnit.SECONDS);
            if(null==result || result.equalsIgnoreCase("")){
                FLAG = false;
                System.out.println(Thread.currentThread().getName()+"\t 超过2秒,消费退出");
                System.out.println();
                System.out.println();
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t消费队列"+result+"成功");
        }
    }

    public void stop() throws Exception{
        this.FLAG = false;
    }
}

/*
* volatile/CAS/atomicInteger/BlockQueue/线程交互/原子引用
* */

public class ProdConsumer_BlockQueueDemo {
    public static void main(String[] args) throws Exception{
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t 生产线程启动");
            System.out.println();
            System.out.println();
            try{
                myResource.myProd();
            }catch (Exception e){
                e.printStackTrace();
            }
        },"Prod").start();

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t 消费线程启动");
            try{
                myResource.myConsumer();
            }catch (Exception e){
                e.printStackTrace();
            }
        },"Consumer").start();

        try{TimeUnit.SECONDS.sleep(5);}catch (InterruptedException e){e.printStackTrace();}

        System.out.println();
        System.out.println();
        System.out.println();

        System.out.println("5秒钟到,main停止");
        myResource.stop();
    }
}