This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e2bd04d87c4 Pipe: Fix iterator.hasNext() == true after removal from
empty ConcurrentIterableLinkedQueue (#12514)
e2bd04d87c4 is described below
commit e2bd04d87c45f9639f6efef5beb8c1ebc6942893
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 13 15:17:37 2024 +0800
Pipe: Fix iterator.hasNext() == true after removal from empty
ConcurrentIterableLinkedQueue (#12514)
---
.../queue/ConcurrentIterableLinkedQueue.java | 20 +++---
.../ConcurrentIterableLinkedQueueTest.java | 84 ++++++++++++----------
2 files changed, 59 insertions(+), 45 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
index 7672ef73371..78a8c1a75fa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
@@ -40,7 +40,7 @@ public class ConcurrentIterableLinkedQueue<E> {
private E data;
private LinkedListNode<E> next;
- private LinkedListNode(E data) {
+ private LinkedListNode(final E data) {
this.data = data;
this.next = null;
}
@@ -65,7 +65,7 @@ public class ConcurrentIterableLinkedQueue<E> {
*
* @param e the element to be added, which cannot be {@code null}
*/
- public void add(E e) {
+ public void add(final E e) {
if (e == null) {
throw new IllegalArgumentException("Null element is not allowed.");
}
@@ -125,7 +125,11 @@ public class ConcurrentIterableLinkedQueue<E> {
}
firstNode = currentNode;
- pilotNode.next = firstNode;
+ // pilotNode.next shall be null when the queue is empty and firstNode ==
pilotNode
+ // to make iterator.hasNext() == false when the iterator is on the
pilotNode
+ if (firstNode != pilotNode) {
+ pilotNode.next = firstNode;
+ }
// Reset firstNode and lastNode to pilotNode if the queue becomes empty
if (firstNode == null) {
@@ -200,7 +204,7 @@ public class ConcurrentIterableLinkedQueue<E> {
}
}
- public void setFirstIndex(long firstIndex) {
+ public void setFirstIndex(final long firstIndex) {
lock.writeLock().lock();
try {
this.firstIndex = firstIndex;
@@ -218,7 +222,7 @@ public class ConcurrentIterableLinkedQueue<E> {
* If the queue is empty, the given index is valid if it is equal to {@link
* ConcurrentIterableLinkedQueue#firstIndex}.
*/
- public boolean isNextIndexValid(long nextIndex) {
+ public boolean isNextIndexValid(final long nextIndex) {
lock.readLock().lock();
try {
return firstIndex <= nextIndex && nextIndex <= tailIndex;
@@ -231,7 +235,7 @@ public class ConcurrentIterableLinkedQueue<E> {
return !iteratorSet.isEmpty();
}
- public DynamicIterator iterateFrom(long offset) {
+ public DynamicIterator iterateFrom(final long offset) {
final DynamicIterator iterator = new DynamicIterator(offset);
iteratorSet.put(iterator, iterator);
return iterator;
@@ -289,7 +293,7 @@ public class ConcurrentIterableLinkedQueue<E> {
* @return the next element in the queue. {@code null} if the queue is
closed, or if the waiting
* time elapsed, or the thread is interrupted
*/
- public E next(long waitTimeMillis) {
+ public E next(final long waitTimeMillis) {
lock.writeLock().lock();
try {
while (!hasNext()) {
@@ -306,7 +310,7 @@ public class ConcurrentIterableLinkedQueue<E> {
++nextIndex;
return currentNode.data;
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted while waiting for next element.", e);
return null;
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
index f04bc41a984..a7a7310b113 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/ConcurrentIterableLinkedQueueTest.java
@@ -40,7 +40,7 @@ import static org.junit.Assert.assertFalse;
public class ConcurrentIterableLinkedQueueTest {
- ConcurrentIterableLinkedQueue<Integer> queue;
+ private ConcurrentIterableLinkedQueue<Integer> queue;
@Before
public void setUp() {
@@ -60,7 +60,7 @@ public class ConcurrentIterableLinkedQueueTest {
queue.add(2);
queue.add(3);
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
itr.seek(2);
Assert.assertEquals(Integer.valueOf(3), itr.next());
}
@@ -68,7 +68,7 @@ public class ConcurrentIterableLinkedQueueTest {
@Test(timeout = 60000)
public void testTimedGet() {
queue.add(1);
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFromEarliest();
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFromEarliest();
Assert.assertEquals(1, (int) itr.next(1000));
Assert.assertNull(itr.next(1000));
}
@@ -80,8 +80,8 @@ public class ConcurrentIterableLinkedQueueTest {
@Test(timeout = 60000)
public void testConcurrentAddAndRemove() throws InterruptedException {
- int numberOfAdds = 500;
- ExecutorService executor = Executors.newFixedThreadPool(2);
+ final int numberOfAdds = 500;
+ final ExecutorService executor = Executors.newFixedThreadPool(2);
// Thread 1 adds elements to the queue
executor.submit(
@@ -110,9 +110,9 @@ public class ConcurrentIterableLinkedQueueTest {
@Test(timeout = 60000)
public void testIterateFromEmptyQueue() {
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(1);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(1);
- AtomicInteger value = new AtomicInteger(-1);
+ final AtomicInteger value = new AtomicInteger(-1);
new Thread(() -> value.set(itr.next())).start();
queue.add(3);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(3,
value.get()));
@@ -120,8 +120,8 @@ public class ConcurrentIterableLinkedQueueTest {
@Test(timeout = 60000)
public void testContinuousEmptyNext() throws InterruptedException {
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
- AtomicInteger consumedValue = new AtomicInteger(0);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
+ final AtomicInteger consumedValue = new AtomicInteger(0);
new Thread(
() -> {
while (true) {
@@ -143,7 +143,7 @@ public class ConcurrentIterableLinkedQueueTest {
public void testRemove() {
queue.add(1);
queue.add(2);
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(1);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(1);
Assert.assertEquals(1, queue.tryRemoveBefore(Long.MAX_VALUE));
Assert.assertEquals(2, (int) itr.next());
@@ -153,7 +153,7 @@ public class ConcurrentIterableLinkedQueueTest {
public void testRemoveAgainstNewestItr() {
queue.add(1);
queue.add(2);
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFromLatest();
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFromLatest();
Assert.assertEquals(2, queue.tryRemoveBefore(Long.MAX_VALUE));
queue.add(3);
@@ -164,7 +164,7 @@ public class ConcurrentIterableLinkedQueueTest {
public void testClear() {
queue.add(1);
queue.add(2);
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(1);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(1);
queue.clear();
assertFalse(queue.hasAnyIterators());
@@ -186,12 +186,12 @@ public class ConcurrentIterableLinkedQueueTest {
Assert.assertEquals(1, queue.getFirstIndex());
Assert.assertEquals(2, queue.getTailIndex());
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it =
queue.iterateFromEarliest();
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it =
queue.iterateFromEarliest();
Assert.assertEquals(2, (int) it.next());
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it2 =
queue.iterateFromLatest();
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator it2 =
queue.iterateFromLatest();
Assert.assertEquals(2, it2.getNextIndex());
- AtomicInteger value = new AtomicInteger(-1);
+ final AtomicInteger value = new AtomicInteger(-1);
new Thread(() -> value.set(it2.next())).start();
queue.add(3);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(3,
value.get()));
@@ -206,10 +206,10 @@ public class ConcurrentIterableLinkedQueueTest {
@Test(timeout = 60000)
public void testConcurrentReadWrite() {
- AtomicBoolean failure = new AtomicBoolean(false);
- List<Thread> threadList = new ArrayList<>(102);
+ final AtomicBoolean failure = new AtomicBoolean(false);
+ final List<Thread> threadList = new ArrayList<>(102);
- Thread thread1 =
+ final Thread thread1 =
new Thread(
() -> {
try {
@@ -223,7 +223,7 @@ public class ConcurrentIterableLinkedQueueTest {
threadList.add(thread1);
thread1.start();
- Thread thread2 =
+ final Thread thread2 =
new Thread(
() -> {
try {
@@ -238,7 +238,7 @@ public class ConcurrentIterableLinkedQueueTest {
thread2.start();
for (int i = 0; i < 100; ++i) {
- Thread thread =
+ final Thread thread =
new Thread(
() -> {
try {
@@ -282,13 +282,13 @@ public class ConcurrentIterableLinkedQueueTest {
@Test(timeout = 60000)
public void testBoundaryConditions() {
queue.add(1);
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(10);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(10);
Assert.assertFalse(itr.hasNext());
}
@Test(timeout = 60000)
public void testConcurrentExceptionHandling() throws InterruptedException {
- ExecutorService executor = Executors.newFixedThreadPool(2);
+ final ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(
() -> {
@@ -301,11 +301,11 @@ public class ConcurrentIterableLinkedQueueTest {
queue.clear();
});
- AtomicBoolean caughtException = new AtomicBoolean(false);
+ final AtomicBoolean caughtException = new AtomicBoolean(false);
executor.submit(
() -> {
try {
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFromEarliest();
while (itr.hasNext()) {
itr.next();
@@ -327,7 +327,7 @@ public class ConcurrentIterableLinkedQueueTest {
queue.add(i);
}
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
for (int i = 0; i < numberOfElements; i++) {
Assert.assertTrue(itr.hasNext());
Assert.assertEquals(Integer.valueOf(i), itr.next());
@@ -337,8 +337,8 @@ public class ConcurrentIterableLinkedQueueTest {
@Test(timeout = 60000)
public void testMultiThreadedConsistency() throws InterruptedException {
- int numberOfElements = 1000;
- ExecutorService executor = Executors.newFixedThreadPool(10);
+ final int numberOfElements = 1000;
+ final ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < numberOfElements; i++) {
int finalI = i;
@@ -348,8 +348,8 @@ public class ConcurrentIterableLinkedQueueTest {
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFromEarliest();
- HashSet<Integer> elements = new HashSet<>();
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFromEarliest();
+ final HashSet<Integer> elements = new HashSet<>();
while (itr.hasNext()) {
elements.add(itr.next());
}
@@ -363,7 +363,7 @@ public class ConcurrentIterableLinkedQueueTest {
queue.add(i);
}
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
for (int i = 0; i < 5; i++) {
Assert.assertTrue(itr.hasNext());
Assert.assertEquals(Integer.valueOf(i), itr.next());
@@ -378,7 +378,7 @@ public class ConcurrentIterableLinkedQueueTest {
}
queue.tryRemoveBefore(3);
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
for (int i = 3; i < 5; i++) {
Assert.assertTrue(itr.hasNext());
Assert.assertEquals(Integer.valueOf(i), itr.next());
@@ -392,8 +392,8 @@ public class ConcurrentIterableLinkedQueueTest {
queue.add(i);
}
- ExecutorService executor = Executors.newFixedThreadPool(10);
- AtomicInteger count = new AtomicInteger(0);
+ final ExecutorService executor = Executors.newFixedThreadPool(10);
+ final AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
executor.submit(
@@ -417,7 +417,7 @@ public class ConcurrentIterableLinkedQueueTest {
queue.add(i);
}
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
for (int i = 0; i < 3; i++) {
Assert.assertTrue(itr.hasNext());
Assert.assertEquals(Integer.valueOf(i), itr.next());
@@ -447,10 +447,20 @@ public class ConcurrentIterableLinkedQueueTest {
Assert.assertFalse(itr.hasNext());
}
+ @Test(timeout = 60000)
+ public void testIterateAfterRemoveFromEmptyQueue() {
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
+ Assert.assertFalse(itr.hasNext());
+ queue.tryRemoveBefore(0);
+ Assert.assertFalse(itr.hasNext());
+ itr.next(10);
+ Assert.assertEquals(0, itr.getNextIndex());
+ }
+
@Test(timeout = 60000)
public void testIteratorExceptionHandling() {
queue.add(1);
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
queue.clear();
Assert.assertFalse(itr.hasNext());
@@ -464,8 +474,8 @@ public class ConcurrentIterableLinkedQueueTest {
queue.add(i);
}
- ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
- long newNextIndex = itr.seek(5);
+ final ConcurrentIterableLinkedQueue<Integer>.DynamicIterator itr =
queue.iterateFrom(0);
+ final long newNextIndex = itr.seek(5);
Assert.assertEquals(5, newNextIndex);
Assert.assertTrue(itr.hasNext());
Assert.assertEquals(Integer.valueOf(5), itr.next());