博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
死磕java concurrent包系列(五)基于AQS的条件队列把LinkedBlockingQueue“扒光”
阅读量:6352 次
发布时间:2019-06-22

本文共 8170 字,大约阅读时间需要 27 分钟。

LinkedBlockingQueue的基础

LinkedBlockingQueue是一个基于链表的阻塞队列,实际使用上与ArrayBlockingQueue完全一样,我们只需要把之前烤鸡的例子中的Queue对象替换一下即可。如果对于ArrayBlockingQueue不熟悉,可以去看看https://juejin.im/post/5c0f79f3f265da61561f1bec

LinkedBlockingQueue源码分析

源码在node上注释写明了,它是基于一个“two lock queue”算法实现的,感兴趣的同学可以参考这篇paper: 这篇文章为了提升在多处理器的机器上的更好性能的并发而提出了这个算法,其中心思想是:通过两把锁分别控制并发,入队时:只需要锁Tail Node,出队时,只需要锁Head Node。 回到LinkedBlockingQueue,先看看内部成员变量:

public class LinkedBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; /** * Linked list node class */ static class Node
{ E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node
next; Node(E x) { item = x; } } /** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); /** * Head of linked list. * Invariant: head.item == null */ transient Node
head; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node
last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();复制代码

每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点(这个node不同于AQS中的node,它是一个单向链表),其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。这里再次强调如果没有给LinkedBlockingQueue指定容量大小,其默认值将是Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出,这点在使用前希望慎重考虑。至于LinkedBlockingQueue的实现原理图与ArrayBlockingQueue是类似的,除了对添加和移除方法使用单独的锁控制外,两者都使用了不同的Condition条件对象作为等待队列,用于挂起take线程和put线程。 总结如下图:

LinkedBlockingQueue的阻塞添加

同样的,添加的方法主要有:add offer 和put。我们先看看非阻塞添加的add和offer方法,这两个方法的区别同样是添加失败时,add方法是抛异常,offer方法是返回false

public boolean add(E e) {     if (offer(e))         return true;     else         throw new IllegalStateException("Queue full");}复制代码
public boolean offer(E e) {        if (e == null) throw new NullPointerException();        //因为存在并发操作移出和入队互不冲突,与arrayBlockingQueue不同,count被声明为Atomic        final AtomicInteger count = this.count;        //队列满了直接返回        if (count.get() == capacity)            return false;        int c = -1;        Node
node = new Node
(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { //因为存在并发问题,加锁之后再次判断一下队列有没有满 if (count.get() < capacity) { //入队 enqueue(node); //容量+1返回旧值 c = count.getAndIncrement(); //因为在入队时可能同时有出队的线程同时把元素移除,所以在入队后做一个补偿, //如果队列还有空间,那么唤醒一个如归的线程执行添加操作 if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } //c==0,只有可能最开始就是一个空队列(注意上面的c返回的是旧值)此时因为刚好添加了一个元素, //所以唤醒消费的线程去取移出元素 if (c == 0) signalNotEmpty(); return c >= 0; }复制代码
//入队操作private void enqueue(Node
node) { //队列尾节点指向新的node节点 last = last.next = node;}//signalNotEmpty方法去唤醒移出元素的线程,为什么要先获取锁才能signal呢?不懂的同学回去看看AQS://因为条件队列是基于AQS的锁存在的,用法上必须要这么用,否则会抛出异常private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); //唤醒获取并删除元素的线程 notEmpty.signal(); } finally { takeLock.unlock(); } }复制代码

这里的Offer()方法做了两件事:

  • 第一件事是判断队列是否满,满了就直接释放锁,没满就将节点封装成Node入队,然后加锁后再次判断队列添加完成后是否已满,不满就继续唤醒等到在条件对象notFull上的添加线程。
  • 第二件事是,判断是否需要唤醒等到在notEmpty条件对象上的消费线程。

接下来看看put方法,与offer方法如出一辙:

public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        int c = -1;        Node
node = new Node
(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //锁可被中断 putLock.lockInterruptibly(); try { //队列满时加入notFull条件队列 while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); //队列还没有满时,继续唤醒添加线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } //c==0,只有可能最开始就是一个空队列(注意上面的c返回的是旧值)此时因为刚好添加了一个元素, //所以唤醒消费的线程去取移出元素 if (c == 0) signalNotEmpty(); }复制代码

这里有几个问题:

问题1:

为什么添加完成后是继续唤醒在条件队列notFull上的添加线程而不是像ArrayBlockingQueue那样直接唤醒notEmpty条件对象上的消费线程?

分析1: 先回想一下ArrayBlockingQueue:它内部只有一个锁,在内部完成添加元素操作后直接唤醒消费线程去消费。如果ArrayBlockingQueue在添加元素之后再唤醒添加线程的话,消费的线程就可能一直被block,无法执行。 而为了避免这种情况,对于LinkedBlockingQueue来说,他有两个锁,添加和删除元素不是互斥的,添加的过程中可能已经删除好几个元素了,所以他在设计上要尽可能的去唤醒两个条件队列。 添加线程在队列没有满时自己直接唤醒自己的其他添加线程,如果没有等待的添加线程,直接结束了。如果有就直到队列元素已满才结束挂起。注意消费线程的执行过程也是如此。这也是为什么LinkedBlockingQueue的吞吐量要相对大些的原因。

问题2: 为什么if (c == 0)时才去唤醒消费线程呢

分析2: 什么情况下c等于0呢?c值是添加元素前队列的大小,也就是说,之前是空队列,空队列时会有什么情况呢,空队列会阻塞所有的take进程,将其封装到notEmpty的条件队列中。这个时候,c之前是0,现在在执行了enqueue方法后,队列中有元素了,所以他需要立即唤醒阻塞的take进程,否则阻塞的take进程就一直block在队列里,一直沉睡下去。 为什么c>0时,就不会唤醒呢?因为take方法和put方法一样,take方法每次take完元素后,如果队列还有值,它会继续唤醒take队列,也就是说他只要没有被await()阻塞,他就会一直不断的唤醒take线程,而不需要再添加的时候再去唤醒,造成不必要的性能浪费

LinkedBlockingQueue的阻塞移出

相对的,我们再看看take方法:

public E take() throws InterruptedException {        E x;        int c = -1;        //获取当前队列大小        final AtomicInteger count = this.count;        final ReentrantLock takeLock = this.takeLock;        takeLock.lockInterruptibly();//可中断        try {            //如果队列没有数据,当前take线程到条件队列中            while (count.get() == 0) {                notEmpty.await();            }            //如果存在数据直接删除并返回该数据            x = dequeue();            c = count.getAndDecrement();//队列大小减1,返回之前的值            if (c > 1)                notEmpty.signal();//还有数据就唤醒后续的消费线程        } finally {            takeLock.unlock();        }        //满足条件(之前队列是满的,现在刚刚执行dequeue拿出了一个),        //唤醒条件对象上等待队列中的添加线程        if (c == capacity)            signalNotFull();        return x;    }private E dequeue() {        Node
h = head;//获取头结点 Node
first = h.next; //获取头结的下一个节点(要删除的节点) h.next = h; // help GC//自己next指向自己,即被删除 head = first;//更新头结点 E x = first.item;//获取删除节点的值 first.item = null;//清空数据,因为first变成头结点是不能带数据的,这样也就删除队列的带数据的第一个节点 return x; }复制代码

take方法是一个可阻塞可中断的移除方法,主要做了两件事:

  • 如果队列没有数据就挂起当前线程到 notEmpty条件对象的等待队列中一直等待,如果有数据就删除节点并返回数据项,同时唤醒后续消费线程;
  • 尝试唤醒条件对象notFull上等待队列中的添加线程:假设之前队列中满员了,那么新来的put进程将会被阻塞进notFull条件队列,然后await挂起沉睡。这个时候有线程通过take方法拿出了一个元素,如果此时不唤醒notFull条件队列,那么之前满员时队列中的线程就会一直睡死过去

总结

LinkedBlockingQueue的两个队列:

  • notFull条件队列(队列满时阻塞的put线程): await的时机:队列满了 signal的时机:一是put方法放入元素后,如果队列还有空位,会singal线程继续添加;二是如果队列最开始满员,take方法移出了一个元素后,队列还有一个空位时也会唤醒它。

  • notEmpty条件队列(队列空时候阻塞的take线程): await的时机:队列空了 signal的时机:一是take方法移出元素后,如果队列还有空位,会singal线程继续移出;二是如果队列最开始空的,put方法放入了一个元素后,队列还有一个元素时也会唤醒它。

这种算法就是“two lock queue”的设计思想,这也是LinkedBlockingQueue的吞吐量较高的本质原因

ArrayBlockingQueue和LinkedBlockingQueue的比较总结

通过上述的分析,对于LinkedBlockingQueue和ArrayBlockingQueue的基本使用以及内部实现原理我们已较为熟悉了,这里我们就对它们两间的区别来个小结

1.队列大小和构造方法有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题,有坑。

2.数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的单向链表。

3.从GC的角度分析:由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。

4.两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

转载地址:http://fomla.baihongyu.com/

你可能感兴趣的文章
使用几种常用排序方法对C#数组进行排序的代码
查看>>
使用regasm注册.net com组件出现不是有效的.net程序集的解决办法
查看>>
一个2013届毕业生(踏上IT行业)的迷茫(3)
查看>>
关于try 和 throw 简单示例
查看>>
重装VS 2008出现的加载安装组件出错的终极解决方案
查看>>
div+css基础——9.div其它常用属性
查看>>
时间转换成模板字符串
查看>>
关于Http跨域时的Option请求
查看>>
jdk安装
查看>>
vue整合mui
查看>>
js8月-4号,,思想
查看>>
WPF RichTextBox不自动换行
查看>>
linux系统下vim命令详解
查看>>
golang iris下面的websocket
查看>>
A* 算法入门(转)
查看>>
hdu 3030 Increasing Speed Limits (离散化+树状数组+DP思想)
查看>>
信教对你的大脑有益(转)------做一个会祈祷的***无神论者!!!
查看>>
β阶段第一周版本控制报告
查看>>
qml demo分析(abstractitemmodel-数据分离)
查看>>
Hibernate配置文件
查看>>