This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch steve-link
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fd028558f21d9890d0c6de9d779268f2e5ee9a49
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Dec 1 21:04:24 2023 +0800

    refactor
---
 .../ConcurrentIterableLinkedQueue.java             | 294 ++++++++++-----------
 .../ConcurrentIterableLinkedQueueTest.java         |  13 +-
 2 files changed, 145 insertions(+), 162 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java
index 5e22ffabce8..993dd4e63f2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java
@@ -19,255 +19,239 @@
 
 package org.apache.iotdb.commons.pipe.datastructure;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-/**
- * This class allows dynamic iterating over the elements, namely an iterator 
is able to read the
- * incoming element.
- *
- * @param <E> Element type
- */
 public class ConcurrentIterableLinkedQueue<E> {
-  LinkedListNode<E> pilot = new LinkedListNode<>(null);
-  LinkedListNode<E> first;
-  LinkedListNode<E> last;
-  ReentrantLock lock = new ReentrantLock();
 
-  Condition hasNext = lock.newCondition();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConcurrentIterableLinkedQueue.class);
+
+  private static class LinkedListNode<E> {
+
+    private E data;
+    private LinkedListNode<E> next;
+
+    public LinkedListNode(E data) {
+      this.data = data;
+      this.next = null;
+    }
+  }
+
+  private final LinkedListNode<E> dummyNode = new LinkedListNode<>(null);
+  private LinkedListNode<E> firstNode;
+  private LinkedListNode<E> lastNode;
 
-  // The index of elements are [firstIndex, firstIndex + 1, ....,lastIndex - 1]
-  int firstIndex = 0;
-  int lastIndex = 0;
+  // The indexes of elements are (firstIndex, firstIndex + 1, ...., lastIndex 
- 1)
+  private volatile long firstIndex = 0;
+  private volatile long lastIndex = 0;
+
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition hasNextCondition = lock.newCondition();
 
   public ConcurrentIterableLinkedQueue() {
-    // first == last == null
+    firstNode = dummyNode;
+    lastNode = dummyNode;
+  }
+
+  public boolean isEmpty() {
+    lock.lock();
+    try {
+      return firstNode == dummyNode;
+    } finally {
+      lock.unlock();
+    }
   }
 
   public void add(E e) {
+    if (e == null) {
+      throw new IllegalArgumentException("The element to be added cannot be 
null");
+    }
+
+    final LinkedListNode<E> newNode = new LinkedListNode<>(e);
+
     lock.lock();
     try {
-      if (e == null) {
-        throw new IllegalArgumentException(
-            "Null is reserved as the signal to imply a get operation failure. 
Please use another element to imply null.");
-      }
-      final LinkedListNode<E> l = last;
-      final LinkedListNode<E> newNode = new LinkedListNode<>(e);
-      last = newNode;
-      if (l == null) {
-        first = newNode;
-        pilot.next = first;
-      } else {
-        l.next = newNode;
+      if (firstNode == dummyNode) {
+        firstNode = newNode;
       }
+
+      lastNode.next = newNode;
+      lastNode = newNode;
+
       ++lastIndex;
-      hasNext.signalAll();
+
+      hasNextCondition.signalAll();
     } finally {
       lock.unlock();
     }
   }
 
-  public void removeBefore(int newFirst) {
+  public void removeBefore(long newFirstIndex) {
     lock.lock();
     try {
-      if (newFirst <= firstIndex) {
+      newFirstIndex = Math.min(newFirstIndex, lastIndex);
+      if (newFirstIndex <= firstIndex) {
         return;
       }
-      LinkedListNode<E> next;
-      LinkedListNode<E> x;
-      for (x = first; x != null && firstIndex < newFirst; ++firstIndex, x = 
next) {
-        next = x.next;
-        x.data = null;
-        x.next = null;
+      // assert firstIndex < newFirstIndex
+
+      LinkedListNode<E> currentNode = firstNode;
+
+      for (long i = firstIndex; i < newFirstIndex; ++i) {
+        final LinkedListNode<E> nextNode = currentNode.next;
+        currentNode.data = null;
+        currentNode.next = null;
+        currentNode = nextNode;
       }
-      first = x;
-      pilot.next = first;
-      if (first == null) {
-        last = null;
+
+      firstNode = currentNode;
+      firstIndex = newFirstIndex;
+
+      if (firstIndex == lastIndex) {
+        firstNode = dummyNode;
+        lastNode = dummyNode;
       }
-      hasNext.signalAll();
+
+      hasNextCondition.signalAll();
     } finally {
       lock.unlock();
     }
   }
 
   public void clear() {
+    removeBefore(lastIndex);
+  }
+
+  public long getFirstIndex() {
     lock.lock();
     try {
-      removeBefore(lastIndex);
-      firstIndex = 0;
-      lastIndex = 0;
+      return firstIndex;
     } finally {
       lock.unlock();
     }
   }
 
-  public DynamicIterator iterateFrom(int offset) {
-    return new DynamicIterator(offset);
-  }
-
-  public int getFirstIndex() {
+  public long getLastIndex() {
     lock.lock();
     try {
-      return firstIndex;
+      return lastIndex;
     } finally {
       lock.unlock();
     }
   }
 
-  public int getLastIndex() {
+  public Iterator iterateFromEarliest() {
     lock.lock();
     try {
-      return lastIndex;
+      return iterateFrom(firstIndex);
     } finally {
       lock.unlock();
     }
   }
 
-  public DynamicIterator iterateFromEarliest() {
-    return iterateFrom(Integer.MIN_VALUE);
-  }
-
-  public DynamicIterator iterateFromLatest() {
-    return iterateFrom(Integer.MAX_VALUE);
+  public Iterator iterateFromLatest() {
+    lock.lock();
+    try {
+      return iterateFrom(lastIndex);
+    } finally {
+      lock.unlock();
+    }
   }
 
-  private static class LinkedListNode<E> {
-    E data;
-    LinkedListNode<E> next;
-
-    public LinkedListNode(E data) {
-      this.data = data;
-      this.next = null;
+  public Iterator iterateFrom(long index) {
+    lock.lock();
+    try {
+      return new Iterator(index);
+    } finally {
+      lock.unlock();
     }
   }
 
-  // Temporarily, we do not use read lock because read lock in java does not
-  // support condition. Besides, The pure park and un-park method is fairly 
slow,
-  // thus we use Reentrant lock here.
-  public class DynamicIterator {
-
-    private LinkedListNode<E> next;
+  /** NOTE: not thread-safe. */
+  public class Iterator implements java.util.Iterator<E> {
 
-    // Offset is the position of the next element to read
-    private int offset;
-
-    DynamicIterator(int offset) {
-      lock.lock();
-      try {
-        if (last != null && offset >= lastIndex) {
-          next = last;
-          offset = lastIndex;
-        } else {
-          next = pilot;
-          if (firstIndex < offset) {
-            for (int i = 0; i < offset - firstIndex; ++i) {
-              next();
-            }
-          } else {
-            offset = firstIndex;
-          }
-        }
-        this.offset = offset;
-      } finally {
-        lock.unlock();
-      }
-    }
+    private LinkedListNode<E> currentNode;
+    private long currentIndex;
 
-    /**
-     * The getter of the iterator. Never timeOut.
-     *
-     * @return
-     *     <p>1. Directly The next element if exists.
-     *     <p>2. Blocking until available iff the next element does not exist.
-     *     <p>3. Null iff the current element is null or the next element's 
data is null.
-     */
-    public E next() {
-      return next(Long.MAX_VALUE);
+    private Iterator(long index) {
+      seek(index);
     }
 
     /**
-     * The getter of the iterator with the given timeOut.
+     * Seek the {@link Iterator#currentIndex} to the closest position allowed 
to the given index.
+     * Note that one can seek to {@link 
ConcurrentIterableLinkedQueue#lastIndex} to subscribe the
+     * next incoming element.
      *
-     * @param waitTimeMillis the timeOut of the get operation
-     * @return
-     *     <p>1. Directly The next element if exists.
-     *     <p>2. Blocking until available iff the next element does not exist.
-     *     <p>3. Null iff the current element is null or the next element's 
data is null.
+     * @param index the attempt index
+     * @return the actual new index
      */
-    public E next(long waitTimeMillis) {
+    public long seek(long index) {
       lock.lock();
       try {
-        while (!hasNext()) {
-          if (!hasNext.await(waitTimeMillis, TimeUnit.MILLISECONDS)) {
-            return null;
-          }
+        currentNode = firstNode;
+        currentIndex = firstIndex;
+
+        final long targetIndex = Math.max(firstIndex, Math.min(index, 
lastIndex));
+        for (long i = 0; i < targetIndex - firstIndex; ++i) {
+          moveToNext();
         }
 
-        next = next.next;
-        ++offset;
-        return next.data;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      } catch (NullPointerException ignore) {
-        // NullPointerException means the "next" node is null, typically 
because the
-        // element is cleared. Though we don't except a linked queue to be 
cleared
-        // when there are subscriptions alive, still we simply return null to 
notify the subscriber.
+        return currentIndex;
       } finally {
         lock.unlock();
       }
-      return null;
     }
 
+    private void moveToNext() {
+      if (currentNode.next != null) {
+        currentNode = currentNode.next;
+        ++currentIndex;
+      }
+    }
+
+    @Override
     public boolean hasNext() {
       lock.lock();
       try {
-        return next.next != null;
+        return currentNode.next != null;
       } finally {
         lock.unlock();
       }
     }
 
-    /**
-     * Seek the {@link DynamicIterator#offset} to the closest position allowed 
to the given offset.
-     * Note that one can seek to {@link 
ConcurrentIterableLinkedQueue#lastIndex} to subscribe the
-     * next incoming element.
-     *
-     * @param newOffset the attempt newOffset
-     * @return the actual new offset
-     */
-    public int seek(int newOffset) {
+    @Override
+    public E next() {
+      return next(Long.MAX_VALUE);
+    }
+
+    public E next(long waitTimeMillis) {
       lock.lock();
       try {
-        if (newOffset < firstIndex) {
-          newOffset = firstIndex;
-        }
-        if (newOffset > lastIndex) {
-          newOffset = lastIndex;
-        }
-        int oldOffset = offset;
-        if (newOffset < oldOffset) {
-          next = pilot;
-          offset = firstIndex;
-          for (int i = 0; i < newOffset - firstIndex; ++i) {
-            next();
-          }
-        } else {
-          for (int i = 0; i < newOffset - oldOffset; ++i) {
-            next();
+        while (!hasNext()) {
+          if (!hasNextCondition.await(waitTimeMillis, TimeUnit.MILLISECONDS)) {
+            return null;
           }
         }
-        return offset;
+
+        moveToNext();
+
+        return currentNode.data;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return null;
       } finally {
         lock.unlock();
       }
     }
 
-    public int getOffset() {
+    public long getCurrentIndex() {
       lock.lock();
       try {
-        return offset;
+        return currentIndex;
       } finally {
         lock.unlock();
       }
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
index fdb713cf47c..79d872ecdb9 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
@@ -57,7 +57,7 @@ public class ConcurrentIterableLinkedQueueTest {
     queue.add(2);
     queue.add(3);
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFrom(0);
+    ConcurrentIterableLinkedQueue<Integer>.Iterator itr = queue.iterateFrom(0);
     itr.seek(2);
     Assert.assertEquals(Integer.valueOf(3), itr.next());
   }
@@ -65,7 +65,7 @@ public class ConcurrentIterableLinkedQueueTest {
   @Test(timeout = 60000)
   public void testTimedGet() {
     queue.add(1);
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr = 
queue.iterateFromEarliest();
+    ConcurrentIterableLinkedQueue<Integer>.Iterator itr = 
queue.iterateFromEarliest();
     Assert.assertEquals(1, (int) itr.next(1000));
     Assert.assertNull(itr.next(1000));
   }
@@ -115,11 +115,11 @@ public class ConcurrentIterableLinkedQueueTest {
     Assert.assertEquals(1, queue.getFirstIndex());
     Assert.assertEquals(2, queue.getLastIndex());
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it = 
queue.iterateFromEarliest();
+    ConcurrentIterableLinkedQueue<Integer>.Iterator it = 
queue.iterateFromEarliest();
     Assert.assertEquals(2, (int) it.next());
 
-    ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it2 = 
queue.iterateFromLatest();
-    Assert.assertEquals(2, it2.getOffset());
+    ConcurrentIterableLinkedQueue<Integer>.Iterator it2 = 
queue.iterateFromLatest();
+    Assert.assertEquals(2, it2.getCurrentIndex());
     AtomicInteger value = new AtomicInteger(-1);
     new Thread(() -> value.set(it2.next())).start();
     queue.add(3);
@@ -173,8 +173,7 @@ public class ConcurrentIterableLinkedQueueTest {
           new Thread(
               () -> {
                 try {
-                  ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it =
-                      queue.iterateFromEarliest();
+                  ConcurrentIterableLinkedQueue<Integer>.Iterator it = 
queue.iterateFromEarliest();
                   for (int j = 0; j < 20000; ++j) {
                     it.next();
                   }

Reply via email to