This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch steve-link in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fd028558f21d9890d0c6de9d779268f2e5ee9a49 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Dec 1 21:04:24 2023 +0800 refactor --- .../ConcurrentIterableLinkedQueue.java | 294 ++++++++++----------- .../ConcurrentIterableLinkedQueueTest.java | 13 +- 2 files changed, 145 insertions(+), 162 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java index 5e22ffabce8..993dd4e63f2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java @@ -19,255 +19,239 @@ package org.apache.iotdb.commons.pipe.datastructure; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -/** - * This class allows dynamic iterating over the elements, namely an iterator is able to read the - * incoming element. - * - * @param <E> Element type - */ public class ConcurrentIterableLinkedQueue<E> { - LinkedListNode<E> pilot = new LinkedListNode<>(null); - LinkedListNode<E> first; - LinkedListNode<E> last; - ReentrantLock lock = new ReentrantLock(); - Condition hasNext = lock.newCondition(); + private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentIterableLinkedQueue.class); + + private static class LinkedListNode<E> { + + private E data; + private LinkedListNode<E> next; + + public LinkedListNode(E data) { + this.data = data; + this.next = null; + } + } + + private final LinkedListNode<E> dummyNode = new LinkedListNode<>(null); + private LinkedListNode<E> firstNode; + private LinkedListNode<E> lastNode; - // The index of elements are [firstIndex, firstIndex + 1, ....,lastIndex - 1] - int firstIndex = 0; - int lastIndex = 0; + // The indexes of elements are (firstIndex, firstIndex + 1, ...., lastIndex - 1) + private volatile long firstIndex = 0; + private volatile long lastIndex = 0; + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition hasNextCondition = lock.newCondition(); public ConcurrentIterableLinkedQueue() { - // first == last == null + firstNode = dummyNode; + lastNode = dummyNode; + } + + public boolean isEmpty() { + lock.lock(); + try { + return firstNode == dummyNode; + } finally { + lock.unlock(); + } } public void add(E e) { + if (e == null) { + throw new IllegalArgumentException("The element to be added cannot be null"); + } + + final LinkedListNode<E> newNode = new LinkedListNode<>(e); + lock.lock(); try { - if (e == null) { - throw new IllegalArgumentException( - "Null is reserved as the signal to imply a get operation failure. Please use another element to imply null."); - } - final LinkedListNode<E> l = last; - final LinkedListNode<E> newNode = new LinkedListNode<>(e); - last = newNode; - if (l == null) { - first = newNode; - pilot.next = first; - } else { - l.next = newNode; + if (firstNode == dummyNode) { + firstNode = newNode; } + + lastNode.next = newNode; + lastNode = newNode; + ++lastIndex; - hasNext.signalAll(); + + hasNextCondition.signalAll(); } finally { lock.unlock(); } } - public void removeBefore(int newFirst) { + public void removeBefore(long newFirstIndex) { lock.lock(); try { - if (newFirst <= firstIndex) { + newFirstIndex = Math.min(newFirstIndex, lastIndex); + if (newFirstIndex <= firstIndex) { return; } - LinkedListNode<E> next; - LinkedListNode<E> x; - for (x = first; x != null && firstIndex < newFirst; ++firstIndex, x = next) { - next = x.next; - x.data = null; - x.next = null; + // assert firstIndex < newFirstIndex + + LinkedListNode<E> currentNode = firstNode; + + for (long i = firstIndex; i < newFirstIndex; ++i) { + final LinkedListNode<E> nextNode = currentNode.next; + currentNode.data = null; + currentNode.next = null; + currentNode = nextNode; } - first = x; - pilot.next = first; - if (first == null) { - last = null; + + firstNode = currentNode; + firstIndex = newFirstIndex; + + if (firstIndex == lastIndex) { + firstNode = dummyNode; + lastNode = dummyNode; } - hasNext.signalAll(); + + hasNextCondition.signalAll(); } finally { lock.unlock(); } } public void clear() { + removeBefore(lastIndex); + } + + public long getFirstIndex() { lock.lock(); try { - removeBefore(lastIndex); - firstIndex = 0; - lastIndex = 0; + return firstIndex; } finally { lock.unlock(); } } - public DynamicIterator iterateFrom(int offset) { - return new DynamicIterator(offset); - } - - public int getFirstIndex() { + public long getLastIndex() { lock.lock(); try { - return firstIndex; + return lastIndex; } finally { lock.unlock(); } } - public int getLastIndex() { + public Iterator iterateFromEarliest() { lock.lock(); try { - return lastIndex; + return iterateFrom(firstIndex); } finally { lock.unlock(); } } - public DynamicIterator iterateFromEarliest() { - return iterateFrom(Integer.MIN_VALUE); - } - - public DynamicIterator iterateFromLatest() { - return iterateFrom(Integer.MAX_VALUE); + public Iterator iterateFromLatest() { + lock.lock(); + try { + return iterateFrom(lastIndex); + } finally { + lock.unlock(); + } } - private static class LinkedListNode<E> { - E data; - LinkedListNode<E> next; - - public LinkedListNode(E data) { - this.data = data; - this.next = null; + public Iterator iterateFrom(long index) { + lock.lock(); + try { + return new Iterator(index); + } finally { + lock.unlock(); } } - // Temporarily, we do not use read lock because read lock in java does not - // support condition. Besides, The pure park and un-park method is fairly slow, - // thus we use Reentrant lock here. - public class DynamicIterator { - - private LinkedListNode<E> next; + /** NOTE: not thread-safe. */ + public class Iterator implements java.util.Iterator<E> { - // Offset is the position of the next element to read - private int offset; - - DynamicIterator(int offset) { - lock.lock(); - try { - if (last != null && offset >= lastIndex) { - next = last; - offset = lastIndex; - } else { - next = pilot; - if (firstIndex < offset) { - for (int i = 0; i < offset - firstIndex; ++i) { - next(); - } - } else { - offset = firstIndex; - } - } - this.offset = offset; - } finally { - lock.unlock(); - } - } + private LinkedListNode<E> currentNode; + private long currentIndex; - /** - * The getter of the iterator. Never timeOut. - * - * @return - * <p>1. Directly The next element if exists. - * <p>2. Blocking until available iff the next element does not exist. - * <p>3. Null iff the current element is null or the next element's data is null. - */ - public E next() { - return next(Long.MAX_VALUE); + private Iterator(long index) { + seek(index); } /** - * The getter of the iterator with the given timeOut. + * Seek the {@link Iterator#currentIndex} to the closest position allowed to the given index. + * Note that one can seek to {@link ConcurrentIterableLinkedQueue#lastIndex} to subscribe the + * next incoming element. * - * @param waitTimeMillis the timeOut of the get operation - * @return - * <p>1. Directly The next element if exists. - * <p>2. Blocking until available iff the next element does not exist. - * <p>3. Null iff the current element is null or the next element's data is null. + * @param index the attempt index + * @return the actual new index */ - public E next(long waitTimeMillis) { + public long seek(long index) { lock.lock(); try { - while (!hasNext()) { - if (!hasNext.await(waitTimeMillis, TimeUnit.MILLISECONDS)) { - return null; - } + currentNode = firstNode; + currentIndex = firstIndex; + + final long targetIndex = Math.max(firstIndex, Math.min(index, lastIndex)); + for (long i = 0; i < targetIndex - firstIndex; ++i) { + moveToNext(); } - next = next.next; - ++offset; - return next.data; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (NullPointerException ignore) { - // NullPointerException means the "next" node is null, typically because the - // element is cleared. Though we don't except a linked queue to be cleared - // when there are subscriptions alive, still we simply return null to notify the subscriber. + return currentIndex; } finally { lock.unlock(); } - return null; } + private void moveToNext() { + if (currentNode.next != null) { + currentNode = currentNode.next; + ++currentIndex; + } + } + + @Override public boolean hasNext() { lock.lock(); try { - return next.next != null; + return currentNode.next != null; } finally { lock.unlock(); } } - /** - * Seek the {@link DynamicIterator#offset} to the closest position allowed to the given offset. - * Note that one can seek to {@link ConcurrentIterableLinkedQueue#lastIndex} to subscribe the - * next incoming element. - * - * @param newOffset the attempt newOffset - * @return the actual new offset - */ - public int seek(int newOffset) { + @Override + public E next() { + return next(Long.MAX_VALUE); + } + + public E next(long waitTimeMillis) { lock.lock(); try { - if (newOffset < firstIndex) { - newOffset = firstIndex; - } - if (newOffset > lastIndex) { - newOffset = lastIndex; - } - int oldOffset = offset; - if (newOffset < oldOffset) { - next = pilot; - offset = firstIndex; - for (int i = 0; i < newOffset - firstIndex; ++i) { - next(); - } - } else { - for (int i = 0; i < newOffset - oldOffset; ++i) { - next(); + while (!hasNext()) { + if (!hasNextCondition.await(waitTimeMillis, TimeUnit.MILLISECONDS)) { + return null; } } - return offset; + + moveToNext(); + + return currentNode.data; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; } finally { lock.unlock(); } } - public int getOffset() { + public long getCurrentIndex() { lock.lock(); try { - return offset; + return currentIndex; } finally { lock.unlock(); } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java index fdb713cf47c..79d872ecdb9 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java @@ -57,7 +57,7 @@ public class ConcurrentIterableLinkedQueueTest { queue.add(2); queue.add(3); - ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFrom(0); + ConcurrentIterableLinkedQueue<Integer>.Iterator itr = queue.iterateFrom(0); itr.seek(2); Assert.assertEquals(Integer.valueOf(3), itr.next()); } @@ -65,7 +65,7 @@ public class ConcurrentIterableLinkedQueueTest { @Test(timeout = 60000) public void testTimedGet() { queue.add(1); - ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = queue.iterateFromEarliest(); + ConcurrentIterableLinkedQueue<Integer>.Iterator itr = queue.iterateFromEarliest(); Assert.assertEquals(1, (int) itr.next(1000)); Assert.assertNull(itr.next(1000)); } @@ -115,11 +115,11 @@ public class ConcurrentIterableLinkedQueueTest { Assert.assertEquals(1, queue.getFirstIndex()); Assert.assertEquals(2, queue.getLastIndex()); - ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it = queue.iterateFromEarliest(); + ConcurrentIterableLinkedQueue<Integer>.Iterator it = queue.iterateFromEarliest(); Assert.assertEquals(2, (int) it.next()); - ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it2 = queue.iterateFromLatest(); - Assert.assertEquals(2, it2.getOffset()); + ConcurrentIterableLinkedQueue<Integer>.Iterator it2 = queue.iterateFromLatest(); + Assert.assertEquals(2, it2.getCurrentIndex()); AtomicInteger value = new AtomicInteger(-1); new Thread(() -> value.set(it2.next())).start(); queue.add(3); @@ -173,8 +173,7 @@ public class ConcurrentIterableLinkedQueueTest { new Thread( () -> { try { - ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it = - queue.iterateFromEarliest(); + ConcurrentIterableLinkedQueue<Integer>.Iterator it = queue.iterateFromEarliest(); for (int j = 0; j < 20000; ++j) { it.next(); }
