博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java多线程:队列与阻塞队列
阅读量:4947 次
发布时间:2019-06-11

本文共 5211 字,大约阅读时间需要 17 分钟。

1. 什么是阻塞队列

阻塞队列(BlockingQueue)是 Java 5 并发新特性中的内容,阻塞队列的接口是 java.util.concurrent.BlockingQueue,它提供了两个附加操作:当队列中为空时,从队列中获取元素的操作将被阻塞;当队列满时,向队列中添加元素的操作将被阻塞。

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器。

阻塞队列提供了四种操作方法:

  • 抛出异常:当队列满时,再向队列中插入元素,则会抛出IllegalStateException异常。当队列空时,再向队列中获取元素,则会抛出NoSuchElementException异常。
  • 返回特殊值:当队列满时,向队列中添加元素,则返回false,否则返回true。当队列为空时,向队列中获取元素,则返回null,否则返回元素。
  • 一直阻塞:当阻塞队列满时,如果生产者向队列中插入元素,则队列会一直阻塞当前线程,直到队列可用或响应中断退出。当阻塞队列为空时,如果消费者线程向阻塞队列中获取数据,则队列会一直阻塞当前线程,直到队列空闲或响应中断退出。
  • 超时退出:当队列满时,如果生产线程向队列中添加元素,则队列会阻塞生产线程一段时间,超过指定的时间则退出返回false。当队列为空时,消费线程从队列中移除元素,则队列会阻塞一段时间,如果超过指定时间退出返回null。

2. Java中的阻塞队列

JDK7提供了7个阻塞队列。分别是

下面分别简单介绍一下:

  1. ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】

  2. LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。
  3. PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
  4. DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)
  5. SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
  6. LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
  7. LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

Java中线程安全的内置队列还有两个:ConcurrentLinkedQueue和LinkedTransferQueue,它们使用了CAS这种无锁的方式来实现了线程安全的队列。无锁的方式性能好,但是队列是无界的,用在生产系统中,生产者生产速度过快,可能导致内存溢出。有界的阻塞队列ArrayBlockingQueue和LinkedBlockingQueue,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样的话就只剩下ArrayBlockingQueue。(先埋个坑在这儿,近来接触到了disruptor,感觉妙不可言。)

3. 阻塞队列的实现原理

这里分析下ArrayBlockingQueue的实现原理。

构造方法:

ArrayBlockingQueue(int capacity);ArrayBlockingQueue(int capacity, boolean fair);ArrayBlockingQueue(int capacity, boolean fair, Collection
c)

 

ArrayBlockingQueue提供了三种构造方法,参数含义如下:

  • capacity:容量,即队列大小。
  • fair:是否公平锁。
  • c:队列初始化元素,顺序按照Collection遍历顺序。

插入元素

public void put(E e) throws InterruptedException {    checkNotNull(e);    final ReentrantLock lock = this.lock;    lock.lockInterruptibly();    try {        while (count == items.length)            notFull.await();        enqueue(e);    } finally {        lock.unlock();    }}

 

从源码可以看出,生产者首先获得锁lock,然后判断队列是否已经满了,如果满了,则等待,直到被唤醒,然后调用enqueue插入元素。

private void enqueue(E x) {    // assert lock.getHoldCount() == 1;    // assert items[putIndex] == null;    final Object[] items = this.items;    items[putIndex] = x;    if (++putIndex == items.length)        putIndex = 0;    count++;    notEmpty.signal();}

 

以上是enqueue的实现,实现的操作是插入元素到一个环形数组,然后唤醒notEmpty上阻塞的线程。

获取元素

public E take() throws InterruptedException {    final ReentrantLock lock = this.lock;    lock.lockInterruptibly();    try {        while (count == 0)            notEmpty.await();        return dequeue();    } finally {        lock.unlock();    }}

 

从源码可以看出,消费者首先获得锁,然后判断队列是否为空,为空,则等待,直到被唤醒,然后调用dequeue获取元素。

private E dequeue() {    // assert lock.getHoldCount() == 1;    // assert items[takeIndex] != null;    final Object[] items = this.items;    @SuppressWarnings("unchecked")    E x = (E) items[takeIndex];    items[takeIndex] = null;    if (++takeIndex == items.length)        takeIndex = 0;    count--;    if (itrs != null)        itrs.elementDequeued();    notFull.signal();    return x;}

 

以上是dequeue的实现,获取环形数组当前takeIndex的元素,并及时将当前元素置为null,设置下一次takeIndex的值takeIndex++,然后唤醒notFull上阻塞的线程。

还有其他方法offer(E e)poll()add(E e)remove()、 offer(E e, long timeout, TimeUnit unit)等的实现,因为常用take和put,这些方法就不一一赘述了。

4. 阻塞队列的基本使用

使用阻塞队列实现生产者-消费者模式:

/** * Created by noly on 2017/5/19. */public class BlockingQueueTest {    public static void main (String[] args) {        ArrayBlockingQueue
queue = new ArrayBlockingQueue
(10); Consumer consumer = new Consumer(queue); Producer producer = new Producer(queue); producer.start(); consumer.start(); }}class Consumer extends Thread { private ArrayBlockingQueue
queue; public Consumer(ArrayBlockingQueue
queue){ this.queue = queue; } @Override public void run() { while(true) { try { Integer i = queue.take(); System.out.println("消费者从队列取出元素:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } }}class Producer extends Thread { private ArrayBlockingQueue
queue; public Producer(ArrayBlockingQueue
queue){ this.queue = queue; } @Override public void run() { for (int i = 0; i < 100; i++) { try { queue.put(i); System.out.println("生产者向队列插入元素:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } }}

 

如果不使用阻塞队列,使用Object.wait()和Object.notify()、非阻塞队列实现生产者-消费者模式,考虑线程间的通讯,会非常麻烦。

参考资料:

    

    

    

    

转载于:https://www.cnblogs.com/ITtangtang/p/7607280.html

你可能感兴趣的文章
The kth great number(hdu4006+优先队列)
查看>>
浅谈微信小程序
查看>>
Drools(BRMS) 速成教程(上)
查看>>
[svc]通过ssh tunnel连接内网ECS和RDS
查看>>
界面图片
查看>>
[翻译]深入理解Win32结构化异常处理(三)
查看>>
Java集合之HashMap
查看>>
SQL: Case when then
查看>>
sql 改字段名
查看>>
认识CSS3 transform 属性
查看>>
notepad++ 中配置python解释器
查看>>
压deadline的 py-Four fundamental operations of recursion
查看>>
python基础 文件操作
查看>>
量化自我—趋势还是忽悠
查看>>
SQLCODE=-668, SQLSTATE=57016, SQLERRMC=7
查看>>
大半夜的很无聊,想写个计算机的遥控器
查看>>
△POJ1328--Radar Installation(贪心)
查看>>
数据结构实验7:实现二分查找、二叉排序(查找)树和AVL树
查看>>
JAVA正则表达式:Pattern类与Matcher类详解(转)
查看>>
ls -alrth 及ls 详解
查看>>