This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch steve-link
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c1f477f28091766b6df2885b5a0e3ce4fb30245d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Dec 1 21:32:06 2023 +0800

    Update ConcurrentIterableLinkedQueue.java
---
 .../ConcurrentIterableLinkedQueue.java             | 105 ++++++++++-----------
 1 file changed, 50 insertions(+), 55 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java
index 993dd4e63f2..dbaf827274e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueue.java
@@ -35,19 +35,17 @@ public class ConcurrentIterableLinkedQueue<E> {
     private E data;
     private LinkedListNode<E> next;
 
-    public LinkedListNode(E data) {
+    private LinkedListNode(E data) {
       this.data = data;
       this.next = null;
     }
   }
 
   private final LinkedListNode<E> dummyNode = new LinkedListNode<>(null);
-  private LinkedListNode<E> firstNode;
-  private LinkedListNode<E> lastNode;
-
-  // The indexes of elements are (firstIndex, firstIndex + 1, ...., lastIndex 
- 1)
-  private volatile long firstIndex = 0;
-  private volatile long lastIndex = 0;
+  private volatile LinkedListNode<E> firstNode;
+  private volatile LinkedListNode<E> lastNode;
+  private volatile long firstIndex = -1;
+  private volatile long lastIndex = -1;
 
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition hasNextCondition = lock.newCondition();
@@ -60,7 +58,7 @@ public class ConcurrentIterableLinkedQueue<E> {
   public boolean isEmpty() {
     lock.lock();
     try {
-      return firstNode == dummyNode;
+      return firstIndex == lastIndex;
     } finally {
       lock.unlock();
     }
@@ -72,18 +70,18 @@ public class ConcurrentIterableLinkedQueue<E> {
     }
 
     final LinkedListNode<E> newNode = new LinkedListNode<>(e);
-
     lock.lock();
     try {
+      ++lastIndex;
+
       if (firstNode == dummyNode) {
+        firstIndex = lastIndex;
         firstNode = newNode;
       }
 
       lastNode.next = newNode;
       lastNode = newNode;
 
-      ++lastIndex;
-
       hasNextCondition.signalAll();
     } finally {
       lock.unlock();
@@ -91,18 +89,20 @@ public class ConcurrentIterableLinkedQueue<E> {
   }
 
   public void removeBefore(long newFirstIndex) {
+    if (newFirstIndex <= firstIndex) {
+      throw new IllegalArgumentException("New first index must be greater than 
the current first index.");
+    }
+
     lock.lock();
     try {
       newFirstIndex = Math.min(newFirstIndex, lastIndex);
       if (newFirstIndex <= firstIndex) {
         return;
       }
-      // assert firstIndex < newFirstIndex
 
       LinkedListNode<E> currentNode = firstNode;
-
       for (long i = firstIndex; i < newFirstIndex; ++i) {
-        final LinkedListNode<E> nextNode = currentNode.next;
+        LinkedListNode<E> nextNode = currentNode.next;
         currentNode.data = null;
         currentNode.next = null;
         currentNode = nextNode;
@@ -145,59 +145,51 @@ public class ConcurrentIterableLinkedQueue<E> {
   }
 
   public Iterator iterateFromEarliest() {
-    lock.lock();
-    try {
-      return iterateFrom(firstIndex);
-    } finally {
-      lock.unlock();
-    }
+    return new Iterator();
   }
 
   public Iterator iterateFromLatest() {
-    lock.lock();
-    try {
-      return iterateFrom(lastIndex);
-    } finally {
-      lock.unlock();
-    }
+    return iterateFrom(lastIndex);
   }
 
   public Iterator iterateFrom(long index) {
-    lock.lock();
-    try {
-      return new Iterator(index);
-    } finally {
-      lock.unlock();
-    }
+    return new Iterator(index);
   }
 
-  /** NOTE: not thread-safe. */
   public class Iterator implements java.util.Iterator<E> {
 
     private LinkedListNode<E> currentNode;
     private long currentIndex;
 
+    private Iterator() {
+      lock.lock();
+      try {
+        currentNode = dummyNode;
+        currentIndex = firstIndex - 1;
+      } finally {
+        lock.unlock();
+      }
+    }
+
     private Iterator(long index) {
-      seek(index);
+      lock.lock();
+      try {
+        seek(index);
+      } finally {
+        lock.unlock();
+      }
     }
 
-    /**
-     * Seek the {@link Iterator#currentIndex} to the closest position allowed 
to the given index.
-     * Note that one can seek to {@link 
ConcurrentIterableLinkedQueue#lastIndex} to subscribe the
-     * next incoming element.
-     *
-     * @param index the attempt index
-     * @return the actual new index
-     */
     public long seek(long index) {
       lock.lock();
       try {
         currentNode = firstNode;
         currentIndex = firstIndex;
 
-        final long targetIndex = Math.max(firstIndex, Math.min(index, 
lastIndex));
-        for (long i = 0; i < targetIndex - firstIndex; ++i) {
-          moveToNext();
+        index = Math.max(firstIndex, Math.min(index, lastIndex));
+        while (currentIndex < index && currentNode.next != null) {
+          currentNode = currentNode.next;
+          currentIndex++;
         }
 
         return currentIndex;
@@ -206,18 +198,11 @@ public class ConcurrentIterableLinkedQueue<E> {
       }
     }
 
-    private void moveToNext() {
-      if (currentNode.next != null) {
-        currentNode = currentNode.next;
-        ++currentIndex;
-      }
-    }
-
     @Override
     public boolean hasNext() {
       lock.lock();
       try {
-        return currentNode.next != null;
+        return currentNode != null && currentNode.next != null;
       } finally {
         lock.unlock();
       }
@@ -231,13 +216,14 @@ public class ConcurrentIterableLinkedQueue<E> {
     public E next(long waitTimeMillis) {
       lock.lock();
       try {
-        while (!hasNext()) {
+        while (currentNode.next == null) {
           if (!hasNextCondition.await(waitTimeMillis, TimeUnit.MILLISECONDS)) {
             return null;
           }
         }
 
-        moveToNext();
+        currentNode = currentNode.next;
+        currentIndex++;
 
         return currentNode.data;
       } catch (InterruptedException e) {
@@ -248,6 +234,15 @@ public class ConcurrentIterableLinkedQueue<E> {
       }
     }
 
+    public E getCurrent() {
+      lock.lock();
+      try {
+        return currentNode.data;
+      } finally {
+        lock.unlock();
+      }
+    }
+
     public long getCurrentIndex() {
       lock.lock();
       try {

Reply via email to