一、LinkedBlockingDeque简介
java6增加了两种容器类型,Deque和BlockingDeque,它们分别对Queue和BlockingQueue进行了扩展。
Deque是一个双端队列,deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。实现了在队列头和队列尾的高效插入和移除。 BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。 正如阻塞队列使用与生产者-消费者模式,双端队列同样适用于另一种相关模式,即工作密取。在生产者-消费者设计中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其它消费者双端队列末尾秘密地获取工作。密取工作模式比传统的生产者-消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。当工作者线程需要访问另一个队列时,它会从队列的尾部而不是头部获取工作,因此进一步降低了队列上的竞争程度。 LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全。此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。
BlockingDeque 的使用
在线程既是一个队列的生产者又是这个队列的消费者的时候可以使用到 BlockingDeque。如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这个时候也可以使用 BlockingDeque。BlockingDeque 图解:
一个 BlockingDeque - 线程在双端队列的两端都可以插入和提取元素。
一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。BlockingDeque 的方法
BlockingDeque 具有 4 组不同的方法用于插入、移除以及对双端队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常 | 特定值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | addFirst(o) | offerFirst(o) | putFirst(o) | offerFirst(o, timeout, timeunit) |
移除 | removeFirst(o) | pollFirst(o) | takeFirst(o) | pollFirst(timeout, timeunit) |
检查 | getFirst(o) | peekFirst(o) |
抛异常 | 特定值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | addLast(o) | offerLast(o) | putLast(o) | offerLast(o, timeout, timeunit) |
移除 | removeLast(o) | pollLast(o) | takeLast(o) | pollLast(timeout, timeunit) |
检查 | getLast(o) | peekLast(o) |
- 抛异常:如果试图的操作无法立即执行,抛一个异常。
- 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
- 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
- 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
BlockingDeque 继承自 BlockingQueue
BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你可以像使用一个 BlockingQueue 那样使用 BlockingDeque。如果你这么干的话,各种插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法一样。
以下是 BlockingDeque 对 BlockingQueue 接口的方法的具体内部实现:
BlockingQueue | BlockingDeque |
---|---|
add() | addLast() |
offer() x 2 | offerLast() x 2 |
put() | putLast() |
remove() | removeFirst() |
poll() x 2 | pollFirst() |
take() | takeFirst() |
element() | getFirst() |
peek() | peekFirst() |
二、LinkedBlockingDeque源码分析
2.1、LinkedBlockingDeque的lock
LinkedBlockingDeque的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。LinkedBlockingDeque是一个带有长度的阻塞队列,初始化的时候可以指定队列长度(如果不指定就是Integer.MAX_VALUE),且指定长度之后不允许进行修改。
/** Main lock guarding all access */ final ReentrantLock lock = new ReentrantLock(); /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition(); /** Condition for waiting puts */ private final Condition notFull = lock.newCondition();
2.2、数据结构
双向链表
/** 双向链表节点 */ static final class Node{ /** * 元素值 */ E item; /** * 节点前驱 * 1.指向前驱;2.指向this,说明前驱是尾节点,看unlinklast;3.指向null说明没有前驱 */ Node prev; /** * 节点后继 * 1.指向后继;2.指向this,说明后继是头结点,看unlinkfirst;3.指向null说明没有后继 */ Node next; Node(E x) { item = x; } }
2.3、成员变量
//first是双向链表的表头 transient Nodefirst; //last是双向链表的表尾 transient Node last; //count是LinkedBlockingDeque的实际大小,即双向链表中当前节点个数 private transient int count; //是LinkedBlockingDeque的容量,它是在创建LinkedBlockingDeque时指定的 private final int capacity;
2.4、构造函数
public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } public LinkedBlockingDeque(Collection c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
2.5、入队
addFirst,addLast分别调用offerFirst,offerLast,而offerFirst,offerLast和putFirst,putLast都是调用了linkFirst和linkLast。
public void addFirst(E e) { if (!offerFirst(e)) throw new IllegalStateException("Deque full"); } public void addLast(E e) { if (!offerLast(e)) throw new IllegalStateException("Deque full"); } public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); Nodenode = new Node (e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(node); } finally { lock.unlock(); } } public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); Node node = new Node (e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(node); } finally { lock.unlock(); } } public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node node = new Node (e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(node)) notFull.await(); } finally { lock.unlock(); } } public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node node = new Node (e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(node)) notFull.await(); } finally { lock.unlock(); } }
linkFirst和linkLast
/** * 设置node为链表头节点,链表满时为false */ private boolean linkFirst(Nodenode) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) //超过容量false return false; Node f = first; node.next = f; //新节点的next指向原first first = node; //设置node为新的first if (last == null) //没有尾节点,就将node设置成尾节点 last = node; else f.prev = node; //有尾节点,那就将之前first的pre指向新增node ++count; //累加节点数量 notEmpty.signal(); //有新节点入队,通知非空条件队列 return true; } /** * 设置node为链表尾节点,链表满时为false */ private boolean linkLast(Node node) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; Node l = last; node.prev = l; last = node; if (first == null) //为null,说明之前队列空吧,那就first也指向node first = node; else l.next = node; //非null,说明之前的last有值,就将之前的last的next指向node ++count; notEmpty.signal(); return true; }
2.6、出队
public E removeFirst() { E x = pollFirst(); if (x == null) throw new NoSuchElementException(); return x; } public E removeLast() { E x = pollLast(); if (x == null) throw new NoSuchElementException(); return x; } public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); } } public E pollLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); } finally { lock.unlock(); } } public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkLast()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } }
核心方法
/** * 移除头结点,链表空返回null */ private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Nodef = first; if (f == null) return null; //空返回null Node n = f.next; E item = f.item; f.item = null; f.next = f; // help GC first = n; if (n == null) //说明之前应该只有一个节点,移除头结点后,链表空,现在first和last都指向null了 last = null; else n.prev = null; //否则的话,n的pre原来指向之前的first,现在n变为first了,pre指向null --count; notFull.signal(); //通知非满条件队列 return item; } /** * 移除尾结点,链表空返回null */ private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Node l = last; if (l == null) return null; Node p = l.prev; E item = l.item; l.item = null; l.prev = l; // help GC last = p; if (p == null) first = null; else p.next = null; --count; notFull.signal(); return item; } /** * 移除指定节点:p--》x--》n */ void unlink(Node x) { // assert lock.isHeldByCurrentThread(); Node p = x.prev; Node n = x.next; if (p == null) { //prev为null说明x节点为头结点 unlinkFirst(); } else if (n == null) { unlinkLast(); //nex为null说明待清除节点为尾节点 } else { //否则的话节点处于链表中间 p.next = n; //将p和n互链 n.prev = p; x.item = null; // 没有断开x节点链接,可能有其他线程在迭代链表 --count; notFull.signal(); } }
2.7、peek方法
public E peek() { return peekFirst(); } public E peekFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return (first == null) ? null : first.item; } finally { lock.unlock(); } } //peekLast就不贴了
2.8、size方法
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
三、JDK或开源框架中使用
四、使用示例
java.util.ArrayDeque 类提供了可调整大小的阵列,并实现了Deque接口。以下是关于阵列双端队列的要点:Java.util.ArrayDeque
-
数组双端队列没有容量限制,使他们增长为必要支持使用。
-
它们不是线程安全的;如果没有外部同步。
-
不支持多线程并发访问。
-
null元素被禁止使用在数组deques。
-
它们要比堆栈Stack和LinkedList快。
此类及其迭代器实现Collection和Iteratorinterfaces方法可选。
类的声明
以下是java.util.ArrayDeque类的声明:
public class ArrayDequeextends AbstractCollection implements Deque , Cloneable, Serializable
这里<E>代表一个元素,它可以是任何类。例如,如果你正在构建一个整数数组列表,那么初始化可为
ArrayListlist = new ArrayList ();
S.N. | 方法 & 描述 |
---|---|
1 | 此方法将添加指定的元素,在此deque队列的末尾。 |
2 | 此方法将添加指定的元素,在此deque队列的前面。 |
3 | 此方法将插入指定的元素,在此deque队列的末尾。 |
4 | 此方法移除此deque队列的元素。 |
5 | 此方法返回此deque队列的副本。 |
6 | 如果此deque 队列包含指定的元素,此方法返回true。 |
7 | 此方法返回一个迭代器在此deque队列以逆向顺序的元素。 |
8 | 此方法检索,但是不移除此deque队列表示的队列的头部。 |
9 | 此方法检索,但是不移除此deque队列的第一个元素。 |
10 | 此方法检索,但是不移除此deque队列的最后一个元素。 |
11 | 如果此deque队列不包含元素,此方法返回true。 |
12 | 此方法返回一个迭代器在此deque队列的元素。 |
13 | 此方法将指定的元素,在此deque队列的末尾。 |
14 | 此方法将指定的元素,在此deque队列的前面。 |
15 | 此方法将指定的元素,在此deque队列的末尾。 |
16 | 此方法检索,但是不移除此deque队列表示的队列的头部,如果此deque队列为空,则返回null。 |
17 | 此方法检索,但是不移除此deque 队列的第一个元素,或者如果此deque 队列为空,则返回null。 |
18 | 此方法检索,但是不移除此deque队列的最后一个元素,如果此deque队列为空,则返回null。 |
19 | 此方法检索并移除此deque队列表示的队列的头部,如果此deque队列为空,则返回null。 |
20 | 此方法检索并移除此deque队列的第一个元素,或者如果此deque队列为空,则返回null。 |
21 | 此方法检索并移除此deque队列的最后一个元素,如果此deque队列为空,则返回null。 |
22 | 这种方法的此deque队列所表示的堆栈弹出一个元素。 |
23 | 这种方法将元素推入此deque队列所表示的堆栈。 |
24 | 此方法检索并移除此deque队列表示的队列的头部。 |
25 | 此方法从此deque队列中移除指定元素的单个实例。 |
26 | 此方法检索并移除此deque队列的第一个元素。 |
27 | 此方法移除此deque队列的指定元素的第一个匹配。 |
28 | 此方法检索并移除此deque队列的最后一个元素。 |
29 | 此方法移除此deque队列的指定元素的最后一次出现。 |
30 | 此方法返回在此deque队列的元素个数。 |
31 | 这个方法返回一个包含所有在此deque队列在适当的序列中元素的数组。 |