This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b614d6d16b5 Pipe: Fixed the bug that drop pipe may stuck when 
disruptor is interrupted or ring buffer full (#17673)
b614d6d16b5 is described below

commit b614d6d16b5ab2ed50216028e4467b165004f6fb
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 2 14:29:00 2026 +0800

    Pipe: Fixed the bug that drop pipe may stuck when disruptor is interrupted 
or ring buffer full (#17673)
    
    * Fix interrupt
    
    * spotless
---
 .../apache/iotdb/db/i18n/DataNodePipeMessages.java |  2 +
 .../apache/iotdb/db/i18n/DataNodePipeMessages.java |  1 +
 .../realtime/assigner/DisruptorQueue.java          | 19 ++++++--
 .../realtime/assigner/PipeDataRegionAssigner.java  | 41 ++++++++++++++--
 .../realtime/disruptor/BatchEventProcessor.java    | 11 +++--
 .../realtime/disruptor/MultiProducerSequencer.java | 21 ++++++++
 .../dataregion/realtime/disruptor/RingBuffer.java  | 22 ++++++++-
 .../realtime/disruptor/DisruptorShutdownTest.java  | 56 ++++++++++++++++++++++
 8 files changed, 160 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 096aa07914b..f669934149d 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -544,6 +544,8 @@ public final class DataNodePipeMessages {
   public static final String PIPE_UNSUPPORTED_SOURCE_REALTIME_MODE_CREATE_A =
       "Pipe: Unsupported source realtime mode: {}, create a hybrid source.";
   public static final String PROCESSOR_INTERRUPTED = "Processor interrupted";
+  public static final String PROCESSOR_INTERRUPTED_UNEXPECTEDLY =
+      "Processor interrupted unexpectedly, continue running";
   public static final String PROCESSOR_STOPPED = "Processor stopped";
   public static final String SET_FOR_HISTORICAL_DELETION_EVENT =
       "[{}]Set {} for historical deletion event {}";
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 4d514c19ba5..131a83cf808 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -521,6 +521,7 @@ public final class DataNodePipeMessages {
   public static final String PIPE_UNSUPPORTED_SOURCE_REALTIME_MODE_CREATE_A =
       "Pipe:不支持的 source realtime mode: {}, create a hybrid source。";
   public static final String PROCESSOR_INTERRUPTED = "处理器被中断";
+  public static final String PROCESSOR_INTERRUPTED_UNEXPECTEDLY = 
"处理器意外中断,继续运行";
   public static final String PROCESSOR_STOPPED = "处理器已停止";
   public static final String SET_FOR_HISTORICAL_DELETION_EVENT =
       "[{}]Set {} for historical deletion event {}";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index dc84095fb74..3ef29dbc900 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -88,16 +88,29 @@ public class DisruptorQueue {
   }
 
   public void publish(final PipeRealtimeEvent event) {
+    publishOrDrop(event);
+  }
+
+  public boolean publishOrDrop(final PipeRealtimeEvent event) {
     final EnrichedEvent innerEvent = event.getEvent();
     if (innerEvent instanceof PipeHeartbeatEvent) {
       ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer);
     }
-    ringBuffer.publishEvent((container, sequence, o) -> 
container.setEvent(event), event);
-    mayPrintExceedingLog();
+    final boolean published =
+        ringBuffer.publishEvent(
+            (container, sequence, o) -> container.setEvent(event), event, 
this::isClosed);
+    if (published) {
+      mayPrintExceedingLog();
+    }
+    return published;
   }
 
-  public void shutdown() {
+  public void closeInput() {
     isClosed = true;
+  }
+
+  public void shutdown() {
+    closeInput();
     // use shutdown instead of halt to ensure all published events have been 
handled
     disruptor.shutdown();
     allocatedMemoryBlock.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 3f84e138a48..90466ba3389 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -71,6 +71,7 @@ public class PipeDataRegionAssigner implements Closeable {
   private volatile int listenToInsertNodeSourceCount = 0;
 
   private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
+  private int inFlightPublishCount = 0;
 
   public int getDataRegionId() {
     return dataRegionId;
@@ -104,14 +105,28 @@ public class PipeDataRegionAssigner implements Closeable {
       ((PipeHeartbeatEvent) innerEvent).onPublished();
     }
 
-    // use synchronized here for completely preventing reference count leaks 
under extreme thread
-    // scheduling when closing
     synchronized (this) {
-      if (!disruptor.isClosed()) {
-        disruptor.publish(event);
-      } else {
+      if (disruptor.isClosed()) {
         onAssignedHook(event);
+        return;
       }
+      inFlightPublishCount++;
+    }
+
+    boolean isPublished = false;
+    try {
+      isPublished = disruptor.publishOrDrop(event);
+    } finally {
+      synchronized (this) {
+        inFlightPublishCount--;
+        if (inFlightPublishCount == 0) {
+          notifyAll();
+        }
+      }
+    }
+
+    if (!isPublished) {
+      onAssignedHook(event);
     }
   }
 
@@ -276,9 +291,25 @@ public class PipeDataRegionAssigner implements Closeable {
   public synchronized void close() {
     PipeAssignerMetrics.getInstance().deregister(dataRegionId);
 
+    boolean interrupted = false;
+    disruptor.closeInput();
+    while (inFlightPublishCount > 0) {
+      try {
+        wait();
+      } catch (final InterruptedException e) {
+        interrupted = true;
+        LOGGER.warn(
+            "Interrupted while waiting for in-flight publishes to finish when 
closing assigner on data region {}.",
+            dataRegionId);
+      }
+    }
+
     final long startTime = System.currentTimeMillis();
     disruptor.shutdown();
     matcher.clear();
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
     LOGGER.info(
         DataNodePipeMessages.PIPE_ASSIGNER_ON_DATA_REGION_SHUTDOWN_INTERNAL,
         dataRegionId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
index 0227e07d5f0..824f7e84b9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
@@ -82,11 +82,14 @@ public final class BatchEventProcessor<T> implements 
Runnable {
         nextSequence = processAvailableEvents(nextSequence, availableSequence);
 
       } catch (final InterruptedException ex) {
-        if (running) {
-          Thread.currentThread().interrupt();
-          LOGGER.info(DataNodePipeMessages.PROCESSOR_INTERRUPTED);
+        if (!running) {
+          break;
         }
-        break;
+        // A transient interrupt should not permanently stop the consumer 
thread. Otherwise the
+        // gating sequence will stop advancing and producers may block forever 
on a full ring
+        // buffer, making the later close path appear stuck.
+        Thread.interrupted();
+        LOGGER.warn(DataNodePipeMessages.PROCESSOR_INTERRUPTED_UNEXPECTEDLY);
       } catch (final Throwable ex) {
         exceptionHandler.handleEventException(ex, nextSequence, 
ringBuffer.get(nextSequence));
         sequence.set(nextSequence);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
index 22a85da5ffd..cb0efe3039d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
@@ -25,6 +25,7 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
+import java.util.function.BooleanSupplier;
 
 /**
  * Multi-producer sequencer for coordinating concurrent publishers
@@ -112,14 +113,31 @@ public final class MultiProducerSequencer {
    * @return highest claimed sequence number
    */
   public long next(int n) {
+    return next(n, () -> false);
+  }
+
+  /**
+   * Claim next n sequences for publishing, or abort if the caller is closing.
+   *
+   * @param n number of sequences to claim
+   * @param abortCondition returns {@code true} if the claim should be 
abandoned
+   * @return highest claimed sequence number, or {@link 
Sequence#INITIAL_VALUE} if aborted
+   */
+  public long next(final int n, final BooleanSupplier abortCondition) {
     if (n < 1) {
       throw new IllegalArgumentException(DataNodePipeMessages.N_MUST_BE_0);
     }
 
+    final BooleanSupplier effectiveAbortCondition =
+        abortCondition != null ? abortCondition : () -> false;
     long current;
     long next;
 
     do {
+      if (effectiveAbortCondition.getAsBoolean()) {
+        return Sequence.INITIAL_VALUE;
+      }
+
       current = cursor.get();
       next = current + n;
 
@@ -130,6 +148,9 @@ public final class MultiProducerSequencer {
         long gatingSequence = 
Sequence.getMinimumSequence(gatingSequences.get(), current);
 
         if (wrapPoint > gatingSequence) {
+          if (effectiveAbortCondition.getAsBoolean()) {
+            return Sequence.INITIAL_VALUE;
+          }
           LockSupport.parkNanos(1);
           continue;
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
index 4f7ba9e44b7..ea94cc85300 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
@@ -21,6 +21,8 @@ package 
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
 
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 
+import java.util.function.BooleanSupplier;
+
 /**
  * Left-hand side padding for cache line alignment
  *
@@ -207,8 +209,26 @@ public final class RingBuffer<E> extends 
RingBufferFields<E> {
    * @param <A> argument type
    */
   public <A> void publishEvent(EventTranslator<E, A> translator, A arg0) {
-    final long sequence = sequencer.next(1);
+    publishEvent(translator, arg0, () -> false);
+  }
+
+  /**
+   * Publish event using a translator function, or abort if the caller is 
closing.
+   *
+   * @param translator function to populate the event
+   * @param arg0 argument passed to translator
+   * @param abortCondition returns {@code true} if the publish should be 
abandoned
+   * @param <A> argument type
+   * @return {@code true} if the event is published, {@code false} if the 
publish is aborted
+   */
+  public <A> boolean publishEvent(
+      final EventTranslator<E, A> translator, final A arg0, final 
BooleanSupplier abortCondition) {
+    final long sequence = sequencer.next(1, abortCondition);
+    if (sequence == Sequence.INITIAL_VALUE) {
+      return false;
+    }
     translateAndPublish(translator, sequence, arg0);
+    return true;
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
index 3fd40c4d4f2..ef57ea625ae 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -113,6 +114,61 @@ public class DisruptorShutdownTest {
     Assert.assertFalse(processorThread.isAlive());
   }
 
+  @Test
+  public void testUnexpectedInterruptDoesNotStopProcessor() throws Exception {
+    final AtomicReference<Thread> processorThreadReference = new 
AtomicReference<>();
+    final ThreadFactory threadFactory =
+        runnable -> {
+          final Thread thread = new Thread(runnable, 
"pipe-disruptor-unexpected-interrupt-test");
+          processorThreadReference.set(thread);
+          return thread;
+        };
+
+    final CountDownLatch handled = new CountDownLatch(1);
+    final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32, 
threadFactory);
+    final RingBuffer<TestEvent> ringBuffer =
+        disruptor.handleEventsWith((event, sequence, endOfBatch) -> 
handled.countDown()).start();
+
+    final Thread processorThread = processorThreadReference.get();
+    Assert.assertNotNull(processorThread);
+
+    TimeUnit.MILLISECONDS.sleep(50);
+    processorThread.interrupt();
+
+    ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 
1);
+    Assert.assertTrue(handled.await(5, TimeUnit.SECONDS));
+    Assert.assertTrue(processorThread.isAlive());
+
+    disruptor.shutdown();
+    Assert.assertFalse(processorThread.isAlive());
+  }
+
+  @Test
+  public void testPublishEventCanAbortWhenClosingWhileBufferIsFull() throws 
Exception {
+    final RingBuffer<TestEvent> ringBuffer = 
RingBuffer.createMultiProducer(TestEvent::new, 1);
+    final Sequence gatingSequence = new Sequence();
+    ringBuffer.addGatingSequences(gatingSequence);
+    ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 
1);
+
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final AtomicBoolean published = new AtomicBoolean(true);
+    final Thread publisherThread =
+        new Thread(
+            () ->
+                published.set(
+                    ringBuffer.publishEvent(
+                        (event, sequence, value) -> event.value = value, 2, 
isClosed::get)),
+            "pipe-disruptor-publish-abort-test");
+
+    publisherThread.start();
+    TimeUnit.MILLISECONDS.sleep(50);
+    isClosed.set(true);
+    publisherThread.join(TimeUnit.SECONDS.toMillis(5));
+
+    Assert.assertFalse(publisherThread.isAlive());
+    Assert.assertFalse(published.get());
+  }
+
   private static class TestEvent {
     private int value;
   }

Reply via email to