This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6680d6f3dae Pipe: Fixed the bug that Disruptor may not clear the 
reference & will wait long time after pipe close (#17549)
6680d6f3dae is described below

commit 6680d6f3daef443946b4209978cf1ce18352aacd
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 28 15:05:17 2026 +0800

    Pipe: Fixed the bug that Disruptor may not clear the reference & will wait 
long time after pipe close (#17549)
    
    * fix
    
    * fix
---
 .../realtime/assigner/PipeDataRegionAssigner.java  |   2 +-
 .../realtime/disruptor/BatchEventProcessor.java    |  53 ++++--
 .../dataregion/realtime/disruptor/Disruptor.java   |   4 +
 .../realtime/disruptor/SequenceBarrier.java        |   4 +
 .../listener/PipeInsertionDataNodeListener.java    |  39 +++--
 .../realtime/disruptor/DisruptorShutdownTest.java  | 183 +++++++++++++++++++++
 6 files changed, 257 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index bdeebde8938..f40de994c31 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -256,7 +256,7 @@ public class PipeDataRegionAssigner implements Closeable {
     matcher.invalidateCache();
   }
 
-  public boolean notMoreExtractorNeededToBeAssigned() {
+  public boolean notMoreSourceNeededToBeAssigned() {
     return matcher.getRegisterCount() == 0;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
index 34930be977e..d0432821cf7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
@@ -69,7 +69,6 @@ public final class BatchEventProcessor<T> implements Runnable 
{
 
   @Override
   public void run() {
-    T event = null;
     long nextSequence = sequence.get() + 1L;
 
     while (running) {
@@ -78,29 +77,59 @@ public final class BatchEventProcessor<T> implements 
Runnable {
         final long availableSequence = sequenceBarrier.waitFor(nextSequence);
 
         // Batch process all available events
-        while (nextSequence <= availableSequence) {
-          event = ringBuffer.get(nextSequence);
-          eventHandler.onEvent(event, nextSequence, nextSequence == 
availableSequence);
-          nextSequence++;
-        }
-
-        // Update sequence
-        sequence.set(availableSequence);
+        nextSequence = processAvailableEvents(nextSequence, availableSequence);
 
       } catch (final InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        LOGGER.info("Processor interrupted");
+        if (running) {
+          Thread.currentThread().interrupt();
+          LOGGER.info("Processor interrupted");
+        }
         break;
       } catch (final Throwable ex) {
-        exceptionHandler.handleEventException(ex, nextSequence, event);
+        exceptionHandler.handleEventException(ex, nextSequence, 
ringBuffer.get(nextSequence));
         sequence.set(nextSequence);
         nextSequence++;
       }
     }
 
+    if (!running) {
+      drainRemainingPublishedEvents(nextSequence);
+    }
     LOGGER.info("Processor stopped");
   }
 
+  private long processAvailableEvents(long nextSequence, long 
availableSequence) throws Throwable {
+    while (nextSequence <= availableSequence) {
+      final T event = ringBuffer.get(nextSequence);
+      eventHandler.onEvent(event, nextSequence, nextSequence == 
availableSequence);
+      nextSequence++;
+    }
+
+    sequence.set(availableSequence);
+    return nextSequence;
+  }
+
+  private void drainRemainingPublishedEvents(long nextSequence) {
+    final long availableSequence = sequenceBarrier.getCursor();
+    if (availableSequence < nextSequence) {
+      return;
+    }
+
+    final long highestPublishedSequence =
+        sequenceBarrier.getHighestPublishedSequence(nextSequence, 
availableSequence);
+    while (nextSequence <= highestPublishedSequence) {
+      final T event = ringBuffer.get(nextSequence);
+      try {
+        eventHandler.onEvent(event, nextSequence, nextSequence == 
highestPublishedSequence);
+      } catch (final Throwable ex) {
+        exceptionHandler.handleEventException(ex, nextSequence, event);
+      } finally {
+        sequence.set(nextSequence);
+      }
+      nextSequence++;
+    }
+  }
+
   private static class DefaultExceptionHandler<T> implements 
ExceptionHandler<T> {
     @Override
     public void handleEventException(Throwable ex, long sequence, T event) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
index 57c6e853f61..f3a64701285 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
@@ -122,11 +122,15 @@ public class Disruptor<T> {
 
     if (processorThread != null) {
       try {
+        processorThread.interrupt();
         processorThread.join(5000);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         LOGGER.warn("Interrupted waiting for processor to stop");
       }
+      if (processorThread.isAlive()) {
+        LOGGER.warn("Timed out waiting for processor to stop");
+      }
     }
 
     started = false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
index 4c8011eb1c2..80f41162fc7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
@@ -75,4 +75,8 @@ public class SequenceBarrier {
   public long getCursor() {
     return sequencer.getCursor().get();
   }
+
+  public long getHighestPublishedSequence(long lowerBound, long 
availableSequence) {
+    return sequencer.getHighestPublishedSequence(lowerBound, 
availableSequence);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 3cce521a51e..fded546d87d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -72,25 +72,34 @@ public class PipeInsertionDataNodeListener {
 
   public synchronized void stopListenAndAssign(
       final int dataRegionId, final PipeRealtimeDataRegionSource extractor) {
-    final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
-    if (assigner == null) {
-      return;
-    }
+    PipeDataRegionAssigner assignerToClose = null;
 
-    assigner.stopAssignTo(extractor);
+    synchronized (this) {
+      final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
+      if (assigner == null) {
+        return;
+      }
 
-    if (extractor.isNeedListenToTsFile()) {
-      listenToTsFileExtractorCount.decrementAndGet();
-    }
-    if (extractor.isNeedListenToInsertNode()) {
-      listenToInsertNodeExtractorCount.decrementAndGet();
+      assigner.stopAssignTo(extractor);
+
+      if (extractor.isNeedListenToTsFile()) {
+        listenToTsFileExtractorCount.decrementAndGet();
+      }
+      if (extractor.isNeedListenToInsertNode()) {
+        listenToInsertNodeExtractorCount.decrementAndGet();
+      }
+
+      if (assigner.notMoreSourceNeededToBeAssigned()) {
+        // The removed assigner will is the same as the one referenced by the 
variable `assigner`
+        dataRegionId2Assigner.remove(dataRegionId);
+        // This will help to release the memory occupied by the assigner
+        assignerToClose = assigner;
+      }
     }
 
-    if (assigner.notMoreExtractorNeededToBeAssigned()) {
-      // The removed assigner will is the same as the one referenced by the 
variable `assigner`
-      dataRegionId2Assigner.remove(dataRegionId);
-      // This will help to release the memory occupied by the assigner
-      assigner.close();
+    if (assignerToClose != null) {
+      // Closing the disruptor may block for a while, so keep it out of the 
global listener lock.
+      assignerToClose.close();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
new file mode 100644
index 00000000000..3fd40c4d4f2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DisruptorShutdownTest {
+
+  @Test
+  public void 
testBatchEventProcessorDrainsPublishedEventsOnShutdownInterrupt() throws 
Exception {
+    final RingBuffer<TestEvent> ringBuffer = 
RingBuffer.createMultiProducer(TestEvent::new, 32);
+    ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 
1);
+
+    final TestSequenceBarrier barrier = new TestSequenceBarrier(0L);
+    final AtomicInteger handledEventCount = new AtomicInteger();
+    final BatchEventProcessor<TestEvent> processor =
+        new BatchEventProcessor<>(
+            ringBuffer,
+            barrier,
+            (event, sequence, endOfBatch) -> 
handledEventCount.incrementAndGet());
+
+    final Thread processorThread = new Thread(processor, 
"pipe-batch-event-processor-test");
+    processorThread.start();
+
+    Assert.assertTrue(barrier.awaitWaitForCall());
+    processor.halt();
+    barrier.interruptWait();
+
+    processorThread.join(TimeUnit.SECONDS.toMillis(5));
+
+    Assert.assertFalse(processorThread.isAlive());
+    Assert.assertEquals(1, handledEventCount.get());
+    Assert.assertEquals(0L, processor.getSequence().get());
+  }
+
+  @Test
+  public void 
testBatchEventProcessorDrainsEventsPublishedAfterCurrentBatchWhenHalting()
+      throws Exception {
+    final RingBuffer<TestEvent> ringBuffer = 
RingBuffer.createMultiProducer(TestEvent::new, 32);
+    ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 
1);
+    ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 
2);
+
+    final SnapshotSequenceBarrier barrier = new SnapshotSequenceBarrier(0L, 
1L);
+    final AtomicInteger handledEventCount = new AtomicInteger();
+    final AtomicReference<BatchEventProcessor<TestEvent>> processorReference =
+        new AtomicReference<>();
+    final BatchEventProcessor<TestEvent> processor =
+        new BatchEventProcessor<>(
+            ringBuffer,
+            barrier,
+            (event, sequence, endOfBatch) -> {
+              handledEventCount.incrementAndGet();
+              if (event.value == 1) {
+                processorReference.get().halt();
+              }
+            });
+    processorReference.set(processor);
+
+    final Thread processorThread =
+        new Thread(processor, "pipe-batch-event-processor-snapshot-test");
+    processorThread.start();
+    processorThread.join(TimeUnit.SECONDS.toMillis(5));
+
+    Assert.assertFalse(processorThread.isAlive());
+    Assert.assertEquals(2, handledEventCount.get());
+    Assert.assertEquals(1L, processor.getSequence().get());
+  }
+
+  @Test
+  public void testDisruptorShutdownInterruptsWaitingProcessor() throws 
Exception {
+    final AtomicReference<Thread> processorThreadReference = new 
AtomicReference<>();
+    final ThreadFactory threadFactory =
+        runnable -> {
+          final Thread thread = new Thread(runnable, 
"pipe-disruptor-shutdown-test");
+          processorThreadReference.set(thread);
+          return thread;
+        };
+
+    final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32, 
threadFactory);
+    disruptor.handleEventsWith((event, sequence, endOfBatch) -> {});
+    disruptor.start();
+
+    final Thread processorThread = processorThreadReference.get();
+    Assert.assertNotNull(processorThread);
+
+    TimeUnit.MILLISECONDS.sleep(50);
+    disruptor.shutdown();
+
+    Assert.assertFalse(processorThread.isAlive());
+  }
+
+  private static class TestEvent {
+    private int value;
+  }
+
+  private static class TestSequenceBarrier extends SequenceBarrier {
+
+    private final long cursor;
+    private final CountDownLatch waitForCalled = new CountDownLatch(1);
+    private final CountDownLatch interruptWait = new CountDownLatch(1);
+
+    private TestSequenceBarrier(final long cursor) {
+      super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
+      this.cursor = cursor;
+    }
+
+    @Override
+    public long waitFor(final long sequence) throws InterruptedException {
+      waitForCalled.countDown();
+      interruptWait.await();
+      throw new InterruptedException();
+    }
+
+    @Override
+    public long getCursor() {
+      return cursor;
+    }
+
+    @Override
+    public long getHighestPublishedSequence(final long lowerBound, final long 
availableSequence) {
+      return availableSequence;
+    }
+
+    private boolean awaitWaitForCall() throws InterruptedException {
+      return waitForCalled.await(5, TimeUnit.SECONDS);
+    }
+
+    private void interruptWait() {
+      interruptWait.countDown();
+    }
+  }
+
+  private static class SnapshotSequenceBarrier extends SequenceBarrier {
+
+    private final long waitForResult;
+    private final long cursor;
+
+    private SnapshotSequenceBarrier(final long waitForResult, final long 
cursor) {
+      super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
+      this.waitForResult = waitForResult;
+      this.cursor = cursor;
+    }
+
+    @Override
+    public long waitFor(final long sequence) {
+      return waitForResult;
+    }
+
+    @Override
+    public long getCursor() {
+      return cursor;
+    }
+
+    @Override
+    public long getHighestPublishedSequence(final long lowerBound, final long 
availableSequence) {
+      return availableSequence;
+    }
+  }
+}

Reply via email to