This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6680d6f3dae Pipe: Fixed the bug that Disruptor may not clear the
reference & will wait long time after pipe close (#17549)
6680d6f3dae is described below
commit 6680d6f3daef443946b4209978cf1ce18352aacd
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 28 15:05:17 2026 +0800
Pipe: Fixed the bug that Disruptor may not clear the reference & will wait
long time after pipe close (#17549)
* fix
* fix
---
.../realtime/assigner/PipeDataRegionAssigner.java | 2 +-
.../realtime/disruptor/BatchEventProcessor.java | 53 ++++--
.../dataregion/realtime/disruptor/Disruptor.java | 4 +
.../realtime/disruptor/SequenceBarrier.java | 4 +
.../listener/PipeInsertionDataNodeListener.java | 39 +++--
.../realtime/disruptor/DisruptorShutdownTest.java | 183 +++++++++++++++++++++
6 files changed, 257 insertions(+), 28 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index bdeebde8938..f40de994c31 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -256,7 +256,7 @@ public class PipeDataRegionAssigner implements Closeable {
matcher.invalidateCache();
}
- public boolean notMoreExtractorNeededToBeAssigned() {
+ public boolean notMoreSourceNeededToBeAssigned() {
return matcher.getRegisterCount() == 0;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
index 34930be977e..d0432821cf7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
@@ -69,7 +69,6 @@ public final class BatchEventProcessor<T> implements Runnable
{
@Override
public void run() {
- T event = null;
long nextSequence = sequence.get() + 1L;
while (running) {
@@ -78,29 +77,59 @@ public final class BatchEventProcessor<T> implements
Runnable {
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
// Batch process all available events
- while (nextSequence <= availableSequence) {
- event = ringBuffer.get(nextSequence);
- eventHandler.onEvent(event, nextSequence, nextSequence ==
availableSequence);
- nextSequence++;
- }
-
- // Update sequence
- sequence.set(availableSequence);
+ nextSequence = processAvailableEvents(nextSequence, availableSequence);
} catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- LOGGER.info("Processor interrupted");
+ if (running) {
+ Thread.currentThread().interrupt();
+ LOGGER.info("Processor interrupted");
+ }
break;
} catch (final Throwable ex) {
- exceptionHandler.handleEventException(ex, nextSequence, event);
+ exceptionHandler.handleEventException(ex, nextSequence,
ringBuffer.get(nextSequence));
sequence.set(nextSequence);
nextSequence++;
}
}
+ if (!running) {
+ drainRemainingPublishedEvents(nextSequence);
+ }
LOGGER.info("Processor stopped");
}
+ private long processAvailableEvents(long nextSequence, long
availableSequence) throws Throwable {
+ while (nextSequence <= availableSequence) {
+ final T event = ringBuffer.get(nextSequence);
+ eventHandler.onEvent(event, nextSequence, nextSequence ==
availableSequence);
+ nextSequence++;
+ }
+
+ sequence.set(availableSequence);
+ return nextSequence;
+ }
+
+ private void drainRemainingPublishedEvents(long nextSequence) {
+ final long availableSequence = sequenceBarrier.getCursor();
+ if (availableSequence < nextSequence) {
+ return;
+ }
+
+ final long highestPublishedSequence =
+ sequenceBarrier.getHighestPublishedSequence(nextSequence,
availableSequence);
+ while (nextSequence <= highestPublishedSequence) {
+ final T event = ringBuffer.get(nextSequence);
+ try {
+ eventHandler.onEvent(event, nextSequence, nextSequence ==
highestPublishedSequence);
+ } catch (final Throwable ex) {
+ exceptionHandler.handleEventException(ex, nextSequence, event);
+ } finally {
+ sequence.set(nextSequence);
+ }
+ nextSequence++;
+ }
+ }
+
private static class DefaultExceptionHandler<T> implements
ExceptionHandler<T> {
@Override
public void handleEventException(Throwable ex, long sequence, T event) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
index 57c6e853f61..f3a64701285 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
@@ -122,11 +122,15 @@ public class Disruptor<T> {
if (processorThread != null) {
try {
+ processorThread.interrupt();
processorThread.join(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted waiting for processor to stop");
}
+ if (processorThread.isAlive()) {
+ LOGGER.warn("Timed out waiting for processor to stop");
+ }
}
started = false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
index 4c8011eb1c2..80f41162fc7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
@@ -75,4 +75,8 @@ public class SequenceBarrier {
public long getCursor() {
return sequencer.getCursor().get();
}
+
+ public long getHighestPublishedSequence(long lowerBound, long
availableSequence) {
+ return sequencer.getHighestPublishedSequence(lowerBound,
availableSequence);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 3cce521a51e..fded546d87d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -72,25 +72,34 @@ public class PipeInsertionDataNodeListener {
public synchronized void stopListenAndAssign(
final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
- final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
- if (assigner == null) {
- return;
- }
+ PipeDataRegionAssigner assignerToClose = null;
- assigner.stopAssignTo(extractor);
+ synchronized (this) {
+ final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
+ if (assigner == null) {
+ return;
+ }
- if (extractor.isNeedListenToTsFile()) {
- listenToTsFileExtractorCount.decrementAndGet();
- }
- if (extractor.isNeedListenToInsertNode()) {
- listenToInsertNodeExtractorCount.decrementAndGet();
+ assigner.stopAssignTo(extractor);
+
+ if (extractor.isNeedListenToTsFile()) {
+ listenToTsFileExtractorCount.decrementAndGet();
+ }
+ if (extractor.isNeedListenToInsertNode()) {
+ listenToInsertNodeExtractorCount.decrementAndGet();
+ }
+
+ if (assigner.notMoreSourceNeededToBeAssigned()) {
+ // The removed assigner will is the same as the one referenced by the
variable `assigner`
+ dataRegionId2Assigner.remove(dataRegionId);
+ // This will help to release the memory occupied by the assigner
+ assignerToClose = assigner;
+ }
}
- if (assigner.notMoreExtractorNeededToBeAssigned()) {
- // The removed assigner will is the same as the one referenced by the
variable `assigner`
- dataRegionId2Assigner.remove(dataRegionId);
- // This will help to release the memory occupied by the assigner
- assigner.close();
+ if (assignerToClose != null) {
+ // Closing the disruptor may block for a while, so keep it out of the
global listener lock.
+ assignerToClose.close();
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
new file mode 100644
index 00000000000..3fd40c4d4f2
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DisruptorShutdownTest {
+
+ @Test
+ public void
testBatchEventProcessorDrainsPublishedEventsOnShutdownInterrupt() throws
Exception {
+ final RingBuffer<TestEvent> ringBuffer =
RingBuffer.createMultiProducer(TestEvent::new, 32);
+ ringBuffer.publishEvent((event, sequence, value) -> event.value = value,
1);
+
+ final TestSequenceBarrier barrier = new TestSequenceBarrier(0L);
+ final AtomicInteger handledEventCount = new AtomicInteger();
+ final BatchEventProcessor<TestEvent> processor =
+ new BatchEventProcessor<>(
+ ringBuffer,
+ barrier,
+ (event, sequence, endOfBatch) ->
handledEventCount.incrementAndGet());
+
+ final Thread processorThread = new Thread(processor,
"pipe-batch-event-processor-test");
+ processorThread.start();
+
+ Assert.assertTrue(barrier.awaitWaitForCall());
+ processor.halt();
+ barrier.interruptWait();
+
+ processorThread.join(TimeUnit.SECONDS.toMillis(5));
+
+ Assert.assertFalse(processorThread.isAlive());
+ Assert.assertEquals(1, handledEventCount.get());
+ Assert.assertEquals(0L, processor.getSequence().get());
+ }
+
+ @Test
+ public void
testBatchEventProcessorDrainsEventsPublishedAfterCurrentBatchWhenHalting()
+ throws Exception {
+ final RingBuffer<TestEvent> ringBuffer =
RingBuffer.createMultiProducer(TestEvent::new, 32);
+ ringBuffer.publishEvent((event, sequence, value) -> event.value = value,
1);
+ ringBuffer.publishEvent((event, sequence, value) -> event.value = value,
2);
+
+ final SnapshotSequenceBarrier barrier = new SnapshotSequenceBarrier(0L,
1L);
+ final AtomicInteger handledEventCount = new AtomicInteger();
+ final AtomicReference<BatchEventProcessor<TestEvent>> processorReference =
+ new AtomicReference<>();
+ final BatchEventProcessor<TestEvent> processor =
+ new BatchEventProcessor<>(
+ ringBuffer,
+ barrier,
+ (event, sequence, endOfBatch) -> {
+ handledEventCount.incrementAndGet();
+ if (event.value == 1) {
+ processorReference.get().halt();
+ }
+ });
+ processorReference.set(processor);
+
+ final Thread processorThread =
+ new Thread(processor, "pipe-batch-event-processor-snapshot-test");
+ processorThread.start();
+ processorThread.join(TimeUnit.SECONDS.toMillis(5));
+
+ Assert.assertFalse(processorThread.isAlive());
+ Assert.assertEquals(2, handledEventCount.get());
+ Assert.assertEquals(1L, processor.getSequence().get());
+ }
+
+ @Test
+ public void testDisruptorShutdownInterruptsWaitingProcessor() throws
Exception {
+ final AtomicReference<Thread> processorThreadReference = new
AtomicReference<>();
+ final ThreadFactory threadFactory =
+ runnable -> {
+ final Thread thread = new Thread(runnable,
"pipe-disruptor-shutdown-test");
+ processorThreadReference.set(thread);
+ return thread;
+ };
+
+ final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32,
threadFactory);
+ disruptor.handleEventsWith((event, sequence, endOfBatch) -> {});
+ disruptor.start();
+
+ final Thread processorThread = processorThreadReference.get();
+ Assert.assertNotNull(processorThread);
+
+ TimeUnit.MILLISECONDS.sleep(50);
+ disruptor.shutdown();
+
+ Assert.assertFalse(processorThread.isAlive());
+ }
+
+ private static class TestEvent {
+ private int value;
+ }
+
+ private static class TestSequenceBarrier extends SequenceBarrier {
+
+ private final long cursor;
+ private final CountDownLatch waitForCalled = new CountDownLatch(1);
+ private final CountDownLatch interruptWait = new CountDownLatch(1);
+
+ private TestSequenceBarrier(final long cursor) {
+ super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
+ this.cursor = cursor;
+ }
+
+ @Override
+ public long waitFor(final long sequence) throws InterruptedException {
+ waitForCalled.countDown();
+ interruptWait.await();
+ throw new InterruptedException();
+ }
+
+ @Override
+ public long getCursor() {
+ return cursor;
+ }
+
+ @Override
+ public long getHighestPublishedSequence(final long lowerBound, final long
availableSequence) {
+ return availableSequence;
+ }
+
+ private boolean awaitWaitForCall() throws InterruptedException {
+ return waitForCalled.await(5, TimeUnit.SECONDS);
+ }
+
+ private void interruptWait() {
+ interruptWait.countDown();
+ }
+ }
+
+ private static class SnapshotSequenceBarrier extends SequenceBarrier {
+
+ private final long waitForResult;
+ private final long cursor;
+
+ private SnapshotSequenceBarrier(final long waitForResult, final long
cursor) {
+ super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
+ this.waitForResult = waitForResult;
+ this.cursor = cursor;
+ }
+
+ @Override
+ public long waitFor(final long sequence) {
+ return waitForResult;
+ }
+
+ @Override
+ public long getCursor() {
+ return cursor;
+ }
+
+ @Override
+ public long getHighestPublishedSequence(final long lowerBound, final long
availableSequence) {
+ return availableSequence;
+ }
+ }
+}