This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 250d97a32a1 [To dev/1.3] Pipe: Fixed the bug that Disruptor may not 
clear the reference & will wait long time after pipe close (#17549) (#17569)
250d97a32a1 is described below

commit 250d97a32a16a8334dbb1012638bab4cafdf453a
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 30 11:19:04 2026 +0800

    [To dev/1.3] Pipe: Fixed the bug that Disruptor may not clear the reference 
& will wait long time after pipe close (#17549) (#17569)
    
    * Pipe: Implementing DisruptorQueue (#16639)
    
    * Pipe: Fixed the bug that Disruptor may not clear the reference & will 
wait long time after pipe close (#17549)
    
    * fix
    
    * fix
    
    * del
    
    ---------
    
    Co-authored-by: Zhenyu Luo <[email protected]>
---
 LICENSE                                            |  12 +
 iotdb-core/datanode/pom.xml                        |   4 -
 .../event/common/heartbeat/PipeHeartbeatEvent.java |   4 +-
 .../realtime/assigner/DisruptorQueue.java          |  15 +-
 .../assigner/DisruptorQueueExceptionHandler.java   |   3 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |   2 +-
 .../realtime/disruptor/BatchEventProcessor.java    | 149 +++++++++++
 .../dataregion/realtime/disruptor/Disruptor.java   | 139 ++++++++++
 .../EventFactory.java}                             |  40 ++-
 .../EventHandler.java}                             |  43 ++-
 .../ExceptionHandler.java}                         |  43 +--
 .../realtime/disruptor/MultiProducerSequencer.java | 259 ++++++++++++++++++
 .../dataregion/realtime/disruptor/RingBuffer.java  | 295 +++++++++++++++++++++
 .../dataregion/realtime/disruptor/Sequence.java    | 122 +++++++++
 .../realtime/disruptor/SequenceBarrier.java        |  82 ++++++
 .../realtime/disruptor/SequenceGroups.java         |  77 ++++++
 .../listener/PipeInsertionDataNodeListener.java    |  45 ++--
 .../realtime/disruptor/DisruptorShutdownTest.java  | 183 +++++++++++++
 pom.xml                                            |   6 -
 19 files changed, 1415 insertions(+), 108 deletions(-)

diff --git a/LICENSE b/LICENSE
index 3f3de45e81a..0a1e5e42a9e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -303,3 +303,15 @@ The following files include code modified from Dropwizard 
Metrics project.
 Copyright (c) 2010-2013 Coda Hale, Yammer.com, 2014-2021 Dropwizard Team
 Project page: https://github.com/dropwizard/metrics
 License: https://github.com/dropwizard/metrics/blob/release/4.2.x/LICENSE
+
+--------------------------------------------------------------------------------
+
+The following files include code modified from LMax Disruptor project.
+
+./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/*
+
+LMax Disruptor is open source software licensed under the Apache License 2.0 
and supported by the Apache Software Foundation.
+Project page: https://github.com/LMAX-Exchange/disruptor
+License: https://github.com/LMAX-Exchange/disruptor/blob/master/LICENCE.txt
+
+--------------------------------------------------------------------------------
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index d6f8d124345..299e50f1be9 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -308,10 +308,6 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.lmax</groupId>
-            <artifactId>disruptor</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.java-websocket</groupId>
             <artifactId>Java-WebSocket</artifactId>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index ad29b285442..567e14f4fe3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -27,10 +27,10 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.event.Event;
 
-import com.lmax.disruptor.RingBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -183,7 +183,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   /////////////////////////////// Queue size Reporting 
///////////////////////////////
 
-  public void recordDisruptorSize(final RingBuffer<?> ringBuffer) {
+  public void recordDisruptorSize(final RingBuffer ringBuffer) {
     if (shouldPrintMessage) {
       disruptorSize = ringBuffer.getBufferSize() - (int) 
ringBuffer.remainingCapacity();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index 2a2fa110749..52ac137ae4e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -26,12 +26,10 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.Disruptor;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.EventHandler;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer;
 
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,9 +73,8 @@ public class DisruptorQueue {
                 32,
                 Math.toIntExact(
                     allocatedMemoryBlock.getMemoryUsageInBytes() / 
ringBufferEntrySizeInBytes)),
-            THREAD_FACTORY,
-            ProducerType.MULTI,
-            new BlockingWaitStrategy());
+            THREAD_FACTORY);
+
     disruptor.handleEventsWith(
         (container, sequence, endOfBatch) -> {
           final PipeRealtimeEvent realtimeEvent = container.getEvent();
@@ -127,7 +124,7 @@ public class DisruptorQueue {
 
   private static class EventContainer {
 
-    private PipeRealtimeEvent event;
+    private volatile PipeRealtimeEvent event;
 
     private EventContainer() {}
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
index 91ad0224fc5..5330f3486f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
@@ -19,7 +19,8 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
 
-import com.lmax.disruptor.ExceptionHandler;
+import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.ExceptionHandler;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index e3067a13126..0b4eb547144 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -202,7 +202,7 @@ public class PipeDataRegionAssigner implements Closeable {
     matcher.deregister(extractor);
   }
 
-  public boolean notMoreExtractorNeededToBeAssigned() {
+  public boolean notMoreSourceNeededToBeAssigned() {
     return matcher.getRegisterCount() == 0;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
new file mode 100644
index 00000000000..d0432821cf7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Batch event processor for consuming events
+ *
+ * <p>This implementation is based on LMAX Disruptor 
(https://github.com/LMAX-Exchange/disruptor)
+ * and simplified for IoTDB's Pipe module (removed complex lifecycle 
management).
+ *
+ * <p>Core algorithm preserved from LMAX Disruptor:
+ *
+ * <ul>
+ *   <li>Batch processing loop
+ *   <li>Sequence tracking
+ *   <li>endOfBatch detection
+ * </ul>
+ *
+ * @param <T> event type
+ */
+public final class BatchEventProcessor<T> implements Runnable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BatchEventProcessor.class);
+
+  private final RingBuffer<T> ringBuffer;
+  private final SequenceBarrier sequenceBarrier;
+  private final EventHandler<? super T> eventHandler;
+  private final Sequence sequence = new Sequence();
+  private ExceptionHandler<? super T> exceptionHandler = new 
DefaultExceptionHandler<>();
+  private volatile boolean running = true;
+
+  public BatchEventProcessor(
+      RingBuffer<T> ringBuffer, SequenceBarrier barrier, EventHandler<? super 
T> eventHandler) {
+    this.ringBuffer = ringBuffer;
+    this.sequenceBarrier = barrier;
+    this.eventHandler = eventHandler;
+  }
+
+  public Sequence getSequence() {
+    return sequence;
+  }
+
+  public void setExceptionHandler(ExceptionHandler<? super T> 
exceptionHandler) {
+    this.exceptionHandler = exceptionHandler;
+  }
+
+  public void halt() {
+    running = false;
+  }
+
+  @Override
+  public void run() {
+    long nextSequence = sequence.get() + 1L;
+
+    while (running) {
+      try {
+        // Wait for available sequence
+        final long availableSequence = sequenceBarrier.waitFor(nextSequence);
+
+        // Batch process all available events
+        nextSequence = processAvailableEvents(nextSequence, availableSequence);
+
+      } catch (final InterruptedException ex) {
+        if (running) {
+          Thread.currentThread().interrupt();
+          LOGGER.info("Processor interrupted");
+        }
+        break;
+      } catch (final Throwable ex) {
+        exceptionHandler.handleEventException(ex, nextSequence, 
ringBuffer.get(nextSequence));
+        sequence.set(nextSequence);
+        nextSequence++;
+      }
+    }
+
+    if (!running) {
+      drainRemainingPublishedEvents(nextSequence);
+    }
+    LOGGER.info("Processor stopped");
+  }
+
+  private long processAvailableEvents(long nextSequence, long 
availableSequence) throws Throwable {
+    while (nextSequence <= availableSequence) {
+      final T event = ringBuffer.get(nextSequence);
+      eventHandler.onEvent(event, nextSequence, nextSequence == 
availableSequence);
+      nextSequence++;
+    }
+
+    sequence.set(availableSequence);
+    return nextSequence;
+  }
+
+  private void drainRemainingPublishedEvents(long nextSequence) {
+    final long availableSequence = sequenceBarrier.getCursor();
+    if (availableSequence < nextSequence) {
+      return;
+    }
+
+    final long highestPublishedSequence =
+        sequenceBarrier.getHighestPublishedSequence(nextSequence, 
availableSequence);
+    while (nextSequence <= highestPublishedSequence) {
+      final T event = ringBuffer.get(nextSequence);
+      try {
+        eventHandler.onEvent(event, nextSequence, nextSequence == 
highestPublishedSequence);
+      } catch (final Throwable ex) {
+        exceptionHandler.handleEventException(ex, nextSequence, event);
+      } finally {
+        sequence.set(nextSequence);
+      }
+      nextSequence++;
+    }
+  }
+
+  private static class DefaultExceptionHandler<T> implements 
ExceptionHandler<T> {
+    @Override
+    public void handleEventException(Throwable ex, long sequence, T event) {
+      LoggerFactory.getLogger(getClass()).error("Exception processing: {} {}", 
sequence, event, ex);
+    }
+
+    @Override
+    public void handleOnStartException(Throwable ex) {
+      LoggerFactory.getLogger(getClass()).error("Exception during onStart()", 
ex);
+    }
+
+    @Override
+    public void handleOnShutdownException(Throwable ex) {
+      LoggerFactory.getLogger(getClass()).error("Exception during 
onShutdown()", ex);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
new file mode 100644
index 00000000000..f3a64701285
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Simplified Disruptor implementation for IoTDB Pipe
+ *
+ * <p>This implementation is based on LMAX Disruptor 
(https://github.com/LMAX-Exchange/disruptor)
+ * and simplified for IoTDB's specific use case in the Pipe module.
+ *
+ * <p>Key simplifications:
+ *
+ * <ul>
+ *   <li>Single event handler support (no complex dependency graphs)
+ *   <li>Simplified lifecycle management
+ *   <li>Removed wait strategies (using simple sleep-based waiting)
+ * </ul>
+ *
+ * @param <T> event type
+ */
+public class Disruptor<T> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Disruptor.class);
+
+  private final RingBuffer<T> ringBuffer;
+  private final ThreadFactory threadFactory;
+  private BatchEventProcessor<T> processor;
+  private Thread processorThread;
+  private ExceptionHandler<? super T> exceptionHandler;
+  private volatile boolean started = false;
+
+  /**
+   * Create a Disruptor instance
+   *
+   * @param eventFactory factory for creating pre-allocated events
+   * @param ringBufferSize buffer size (must be power of 2)
+   * @param threadFactory factory for creating consumer thread
+   */
+  public Disruptor(EventFactory<T> eventFactory, int ringBufferSize, 
ThreadFactory threadFactory) {
+    this.ringBuffer = RingBuffer.createMultiProducer(eventFactory, 
ringBufferSize);
+    this.threadFactory = threadFactory;
+  }
+
+  /**
+   * Configure event handler for processing events
+   *
+   * <p>Creates a batch event processor that will run in its own thread
+   *
+   * @param handler event handler implementation
+   * @return this instance for method chaining
+   */
+  public Disruptor<T> handleEventsWith(final EventHandler<? super T> handler) {
+    SequenceBarrier barrier = ringBuffer.newBarrier();
+    processor = new BatchEventProcessor<>(ringBuffer, barrier, handler);
+
+    if (exceptionHandler != null) {
+      processor.setExceptionHandler(exceptionHandler);
+    }
+
+    ringBuffer.addGatingSequences(processor.getSequence());
+    return this;
+  }
+
+  /**
+   * Set exception handler for error handling
+   *
+   * @param exceptionHandler handler for processing exceptions
+   */
+  public void setDefaultExceptionHandler(ExceptionHandler<? super T> 
exceptionHandler) {
+    this.exceptionHandler = exceptionHandler;
+    if (processor != null) {
+      processor.setExceptionHandler(exceptionHandler);
+    }
+  }
+
+  public RingBuffer<T> start() {
+    if (started) {
+      throw new IllegalStateException("Disruptor already started");
+    }
+
+    if (processor == null) {
+      throw new IllegalStateException("No event handler configured");
+    }
+
+    processorThread = threadFactory.newThread(processor);
+    processorThread.start();
+    started = true;
+
+    LOGGER.info("Disruptor started with buffer size: {}", 
ringBuffer.getBufferSize());
+    return ringBuffer;
+  }
+
+  public void shutdown() {
+    if (!started) {
+      return;
+    }
+
+    if (processor != null) {
+      processor.halt();
+    }
+
+    if (processorThread != null) {
+      try {
+        processorThread.interrupt();
+        processorThread.join(5000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("Interrupted waiting for processor to stop");
+      }
+      if (processorThread.isAlive()) {
+        LOGGER.warn("Timed out waiting for processor to stop");
+      }
+    }
+
+    started = false;
+    LOGGER.info("Disruptor shutdown completed");
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventFactory.java
similarity index 50%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventFactory.java
index 91ad0224fc5..785033c6824 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventFactory.java
@@ -17,28 +17,22 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
 
-import com.lmax.disruptor.ExceptionHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DisruptorQueueExceptionHandler implements 
ExceptionHandler<Object> {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(DisruptorQueueExceptionHandler.class);
-
-  @Override
-  public void handleEventException(final Throwable ex, final long sequence, 
final Object event) {
-    LOGGER.error("Exception processing: {} {}", sequence, event, ex);
-  }
-
-  @Override
-  public void handleOnStartException(final Throwable ex) {
-    LOGGER.warn("Exception during onStart()", ex);
-  }
-
-  @Override
-  public void handleOnShutdownException(final Throwable ex) {
-    LOGGER.warn("Exception during onShutdown()", ex);
-  }
+/**
+ * Event factory for pre-allocating events in RingBuffer
+ *
+ * <p>This interface is based on LMAX Disruptor 
(https://github.com/LMAX-Exchange/disruptor) and
+ * adapted for IoTDB's Pipe module.
+ *
+ * @param <T> event type
+ */
+@FunctionalInterface
+public interface EventFactory<T> {
+  /**
+   * Create new event instance
+   *
+   * @return new event
+   */
+  T newInstance();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventHandler.java
similarity index 50%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventHandler.java
index 91ad0224fc5..1fc81a37623 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventHandler.java
@@ -17,28 +17,25 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
 
-import com.lmax.disruptor.ExceptionHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DisruptorQueueExceptionHandler implements 
ExceptionHandler<Object> {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(DisruptorQueueExceptionHandler.class);
-
-  @Override
-  public void handleEventException(final Throwable ex, final long sequence, 
final Object event) {
-    LOGGER.error("Exception processing: {} {}", sequence, event, ex);
-  }
-
-  @Override
-  public void handleOnStartException(final Throwable ex) {
-    LOGGER.warn("Exception during onStart()", ex);
-  }
-
-  @Override
-  public void handleOnShutdownException(final Throwable ex) {
-    LOGGER.warn("Exception during onShutdown()", ex);
-  }
+/**
+ * Event handler for processing events from RingBuffer
+ *
+ * <p>This interface is based on LMAX Disruptor 
(https://github.com/LMAX-Exchange/disruptor) and
+ * adapted for IoTDB's Pipe module.
+ *
+ * @param <T> event type
+ */
+@FunctionalInterface
+public interface EventHandler<T> {
+  /**
+   * Handle event
+   *
+   * @param event the event
+   * @param sequence sequence number
+   * @param endOfBatch whether this is the last event in current batch
+   * @throws Exception if processing fails
+   */
+  void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/ExceptionHandler.java
similarity index 50%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/ExceptionHandler.java
index 91ad0224fc5..28396b51ffe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/ExceptionHandler.java
@@ -17,28 +17,29 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
 
-import com.lmax.disruptor.ExceptionHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DisruptorQueueExceptionHandler implements 
ExceptionHandler<Object> {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(DisruptorQueueExceptionHandler.class);
-
-  @Override
-  public void handleEventException(final Throwable ex, final long sequence, 
final Object event) {
-    LOGGER.error("Exception processing: {} {}", sequence, event, ex);
-  }
+/**
+ * Exception handler for event processing errors
+ *
+ * <p>This interface is based on LMAX Disruptor 
(https://github.com/LMAX-Exchange/disruptor) and
+ * adapted for IoTDB's Pipe module.
+ *
+ * @param <T> event type
+ */
+public interface ExceptionHandler<T> {
+  /**
+   * Handle exception during event processing
+   *
+   * @param ex exception
+   * @param sequence sequence number
+   * @param event the event
+   */
+  void handleEventException(Throwable ex, long sequence, T event);
 
-  @Override
-  public void handleOnStartException(final Throwable ex) {
-    LOGGER.warn("Exception during onStart()", ex);
-  }
+  /** Handle exception during processor start */
+  void handleOnStartException(Throwable ex);
 
-  @Override
-  public void handleOnShutdownException(final Throwable ex) {
-    LOGGER.warn("Exception during onShutdown()", ex);
-  }
+  /** Handle exception during processor shutdown */
+  void handleOnShutdownException(Throwable ex);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
new file mode 100644
index 00000000000..d40ed968398
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * Multi-producer sequencer for coordinating concurrent publishers
+ *
+ * <p>This implementation is based on LMAX Disruptor 
(https://github.com/LMAX-Exchange/disruptor)
+ * and preserves the core lock-free multi-producer algorithm for IoTDB's Pipe 
module.
+ *
+ * <p>Key features preserved from LMAX Disruptor:
+ *
+ * <ul>
+ *   <li>Lock-free CAS-based sequence claiming
+ *   <li>Availability buffer for out-of-order publishing detection
+ *   <li>Backpressure via gating sequences
+ *   <li>Cache line padding to prevent false sharing
+ * </ul>
+ */
+public final class MultiProducerSequencer {
+
+  /** Ring buffer size (must be power of 2) - immutable after construction */
+  private final int bufferSize;
+
+  /**
+   * Producer cursor tracking highest claimed sequence Updated via CAS in 
next() method Volatile
+   * reads/writes handled by Sequence class
+   */
+  private final Sequence cursor = new Sequence();
+
+  /**
+   * Array of consumer sequences for backpressure control MUST be volatile for 
safe publication when
+   * modified by SequenceGroups Array reference is replaced atomically via
+   * AtomicReferenceFieldUpdater
+   */
+  volatile Sequence[] gatingSequences;
+
+  /**
+   * Cached minimum gating sequence to reduce contention Updated 
opportunistically in next() to
+   * avoid expensive array scan Does not need to be perfectly accurate 
(conservative is safe)
+   */
+  private final Sequence gatingSequenceCache = new Sequence();
+
+  /**
+   * CRITICAL: Availability flags for tracking published sequences
+   *
+   * <p>Handles out-of-order publishing in multi-producer scenario: - Thread A 
claims seq 10, still
+   * writing - Thread B claims seq 11, finishes and publishes - Consumer MUST 
wait for seq 10 before
+   * reading seq 11
+   *
+   * <p>Memory visibility guarantees: - Writers use lazySet() for store-store 
barrier (cheaper than
+   * volatile write) - Readers use get() for volatile read (ensures visibility 
across threads)
+   *
+   * <p>AtomicIntegerArray provides same semantics as Unsafe without reflection
+   */
+  private final AtomicIntegerArray availableBuffer;
+
+  /** Mask for fast modulo: sequence & indexMask == sequence % bufferSize */
+  private final int indexMask;
+
+  /** Shift for calculating wrap count: sequence >>> indexShift */
+  private final int indexShift;
+
+  public MultiProducerSequencer(int bufferSize, Sequence[] gatingSequences) {
+    if (bufferSize < 1) {
+      throw new IllegalArgumentException("bufferSize must not be less than 1");
+    }
+    if (Integer.bitCount(bufferSize) != 1) {
+      throw new IllegalArgumentException("bufferSize must be a power of 2");
+    }
+
+    this.bufferSize = bufferSize;
+    this.gatingSequences = gatingSequences != null ? gatingSequences : new 
Sequence[0];
+    this.availableBuffer = new AtomicIntegerArray(bufferSize);
+    this.indexMask = bufferSize - 1;
+    this.indexShift = log2(bufferSize);
+
+    initialiseAvailableBuffer();
+  }
+
+  /**
+   * Claim next n sequences for publishing
+   *
+   * <p>Uses CAS loop to atomically claim sequence numbers. Implements 
backpressure by parking when
+   * buffer is full.
+   *
+   * @param n number of sequences to claim
+   * @return highest claimed sequence number
+   */
+  public long next(int n) {
+    if (n < 1) {
+      throw new IllegalArgumentException("n must be > 0");
+    }
+
+    long current;
+    long next;
+
+    do {
+      current = cursor.get();
+      next = current + n;
+
+      final long wrapPoint = next - bufferSize;
+      final long cachedGatingSequence = gatingSequenceCache.get();
+
+      if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
+        long gatingSequence = Sequence.getMinimumSequence(gatingSequences, 
current);
+
+        if (wrapPoint > gatingSequence) {
+          LockSupport.parkNanos(1);
+          continue;
+        }
+
+        gatingSequenceCache.set(gatingSequence);
+      } else if (cursor.compareAndSet(current, next)) {
+        break;
+      }
+    } while (true);
+
+    return next;
+  }
+
+  /** Publish sequence */
+  public void publish(final long sequence) {
+    setAvailable(sequence);
+  }
+
+  /** Publish batch */
+  public void publish(long lo, long hi) {
+    for (long l = lo; l <= hi; l++) {
+      setAvailable(l);
+    }
+  }
+
+  /**
+   * CORE: Check if sequence is available for consumption Uses volatile read 
to ensure visibility of
+   * published sequences
+   */
+  public boolean isAvailable(long sequence) {
+    int index = calculateIndex(sequence);
+    int flag = calculateAvailabilityFlag(sequence);
+    return availableBuffer.get(index) == flag;
+  }
+
+  /** CORE: Get highest published - exact same algorithm */
+  public long getHighestPublishedSequence(long lowerBound, long 
availableSequence) {
+    for (long sequence = lowerBound; sequence <= availableSequence; 
sequence++) {
+      if (!isAvailable(sequence)) {
+        return sequence - 1;
+      }
+    }
+    return availableSequence;
+  }
+
+  public Sequence getCursor() {
+    return cursor;
+  }
+
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  public long remainingCapacity() {
+    long consumed = Sequence.getMinimumSequence(gatingSequences, cursor.get());
+    long produced = cursor.get();
+    return bufferSize - (produced - consumed);
+  }
+
+  /**
+   * Add gating sequences for consumer tracking
+   *
+   * <p>Atomically adds sequences to track consumer progress
+   *
+   * @param gatingSequences consumer sequences to add
+   */
+  public void addGatingSequences(Sequence... gatingSequences) {
+    SequenceGroups.addSequences(this, this.cursor, gatingSequences);
+  }
+
+  /**
+   * Create a sequence barrier for consumers
+   *
+   * @param sequencesToTrack upstream sequences to wait for
+   * @return new sequence barrier
+   */
+  public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
+    return new SequenceBarrier(this, sequencesToTrack);
+  }
+
+  /** Initialize available buffer */
+  private void initialiseAvailableBuffer() {
+    for (int i = availableBuffer.length() - 1; i != 0; i--) {
+      setAvailableBufferValue(i, -1);
+    }
+    setAvailableBufferValue(0, -1);
+  }
+
+  /**
+   * CORE: Mark sequence as available for consumption
+   *
+   * <p>Uses lazySet() which provides: - Store-store barrier (ensures all 
prior writes are visible)
+   * - Cheaper than full volatile write (no store-load barrier) - Sufficient 
for this use case
+   * (readers use volatile get)
+   */
+  private void setAvailable(final long sequence) {
+    setAvailableBufferValue(calculateIndex(sequence), 
calculateAvailabilityFlag(sequence));
+  }
+
+  /**
+   * Set availability flag with release semantics lazySet() ensures previous 
event writes are
+   * visible before flag update
+   */
+  private void setAvailableBufferValue(int index, int flag) {
+    availableBuffer.lazySet(index, flag);
+  }
+
+  /** Calculate availability flag */
+  private int calculateAvailabilityFlag(final long sequence) {
+    return (int) (sequence >>> indexShift);
+  }
+
+  /** Calculate index */
+  private int calculateIndex(final long sequence) {
+    return ((int) sequence) & indexMask;
+  }
+
+  /**
+   * Calculate log2 for index shift calculation
+   *
+   * @param i input value (must be power of 2)
+   * @return log2 of input
+   */
+  private static int log2(int i) {
+    int r = 0;
+    while ((i >>= 1) != 0) {
+      ++r;
+    }
+    return r;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
new file mode 100644
index 00000000000..2af784b603d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+/**
+ * Left-hand side padding for cache line alignment
+ *
+ * <p>Prevents false sharing by ensuring RingBuffer fields don't share cache 
lines with preceding
+ * objects
+ */
+abstract class RingBufferPad {
+  protected long p1, p2, p3, p4, p5, p6, p7;
+}
+
+/**
+ * Core fields for RingBuffer implementation
+ *
+ * <p>Contains the actual event storage array and sequencing state
+ */
+abstract class RingBufferFields<E> extends RingBufferPad {
+  /** Pre-allocated event storage with padding to prevent false sharing */
+  private final Object[] entries;
+
+  /** Total number of events in the buffer (must be power of 2) */
+  protected final int bufferSize;
+
+  /** Mask for fast modulo operation (bufferSize - 1) */
+  protected final int indexMask;
+
+  /** Sequencer for managing producer/consumer coordination */
+  protected final MultiProducerSequencer sequencer;
+
+  /**
+   * Initialize ring buffer fields
+   *
+   * @param eventFactory factory for pre-allocating events
+   * @param sequencer multi-producer sequencer
+   */
+  RingBufferFields(EventFactory<E> eventFactory, MultiProducerSequencer 
sequencer) {
+    this.sequencer = sequencer;
+    this.bufferSize = sequencer.getBufferSize();
+
+    if (bufferSize < 1) {
+      throw new IllegalArgumentException("bufferSize must not be less than 1");
+    }
+    if (Integer.bitCount(bufferSize) != 1) {
+      throw new IllegalArgumentException("bufferSize must be a power of 2");
+    }
+
+    this.indexMask = bufferSize - 1;
+    // Allocate array with padding on both sides to prevent false sharing
+    this.entries = new Object[bufferSize];
+    fill(eventFactory);
+  }
+
+  /**
+   * Pre-allocate all events in the buffer
+   *
+   * @param eventFactory factory for creating event instances
+   */
+  private void fill(EventFactory<E> eventFactory) {
+    for (int i = 0; i < bufferSize; i++) {
+      // Store events starting after front padding
+      entries[i] = eventFactory.newInstance();
+    }
+  }
+
+  /**
+   * Get event at sequence using direct memory access
+   *
+   * @param sequence sequence number
+   * @return event at the sequence position
+   */
+  @SuppressWarnings("unchecked")
+  protected final E elementAt(long sequence) {
+    // Use Unsafe for lock-free array access with proper memory barriers
+    return (E) entries[(int) (sequence & indexMask)];
+  }
+}
+
+/**
+ * Lock-free ring buffer for storing pre-allocated event objects
+ *
+ * <p>This implementation is based on LMAX Disruptor 
(https://github.com/LMAX-Exchange/disruptor)
+ * and preserves the core ring buffer algorithm for IoTDB's Pipe module.
+ *
+ * <p>Supports multi-producer concurrent access with zero-garbage design. 
Events are pre-allocated
+ * and reused, avoiding GC pressure. Uses cache line padding to prevent false 
sharing.
+ *
+ * @param <E> event type
+ */
+public final class RingBuffer<E> extends RingBufferFields<E> {
+  /** Initial cursor value for the ring buffer */
+  public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
+
+  /**
+   * Right-hand side padding for cache line alignment
+   *
+   * <p>Prevents false sharing by ensuring RingBuffer fields don't share cache 
lines with following
+   * objects
+   */
+  protected long p1, p2, p3, p4, p5, p6, p7;
+
+  /**
+   * Construct a RingBuffer with given factory and sequencer
+   *
+   * @param eventFactory factory to create and pre-allocate events
+   * @param sequencer multi-producer sequencer for sequence management
+   */
+  private RingBuffer(EventFactory<E> eventFactory, MultiProducerSequencer 
sequencer) {
+    super(eventFactory, sequencer);
+  }
+
+  /**
+   * Create a multi-producer RingBuffer
+   *
+   * <p>Supports concurrent publishing from multiple threads using lock-free 
CAS operations
+   *
+   * @param factory event factory for creating event instances
+   * @param bufferSize buffer size (must be power of 2)
+   * @param <E> event type
+   * @return newly created ring buffer
+   */
+  public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, 
int bufferSize) {
+    MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, 
new Sequence[0]);
+    return new RingBuffer<>(factory, sequencer);
+  }
+
+  /**
+   * Get the event at a specific sequence
+   *
+   * @param sequence sequence number to retrieve
+   * @return event at the given sequence
+   */
+  public E get(long sequence) {
+    return elementAt(sequence);
+  }
+
+  /**
+   * Claim the next sequence for publishing
+   *
+   * <p>Blocks if buffer is full until space becomes available
+   *
+   * @return claimed sequence number
+   */
+  public long next() {
+    return sequencer.next(1);
+  }
+
+  /**
+   * Claim next n sequences for batch publishing
+   *
+   * @param n number of sequences to claim
+   * @return highest claimed sequence number
+   */
+  public long next(int n) {
+    return sequencer.next(n);
+  }
+
+  /**
+   * Publish a single sequence
+   *
+   * <p>Makes the event at this sequence visible to consumers
+   *
+   * @param sequence sequence to publish
+   */
+  public void publish(long sequence) {
+    sequencer.publish(sequence);
+  }
+
+  /**
+   * Publish a batch of sequences
+   *
+   * @param lo lowest sequence in the batch (inclusive)
+   * @param hi highest sequence in the batch (inclusive)
+   */
+  public void publish(long lo, long hi) {
+    sequencer.publish(lo, hi);
+  }
+
+  /**
+   * Publish event using a translator function
+   *
+   * <p>Provides a higher-level API for publishing events with custom 
translation logic
+   *
+   * @param translator function to populate the event
+   * @param arg0 argument passed to translator
+   * @param <A> argument type
+   */
+  public <A> void publishEvent(EventTranslator<E, A> translator, A arg0) {
+    final long sequence = sequencer.next(1);
+    translateAndPublish(translator, sequence, arg0);
+  }
+
+  /**
+   * Translate event and publish atomically
+   *
+   * @param translator event translator function
+   * @param sequence claimed sequence number
+   * @param arg0 argument for translation
+   * @param <A> argument type
+   */
+  private <A> void translateAndPublish(EventTranslator<E, A> translator, long 
sequence, A arg0) {
+    try {
+      translator.translateTo(get(sequence), sequence, arg0);
+    } finally {
+      sequencer.publish(sequence);
+    }
+  }
+
+  /**
+   * Add gating sequences for consumer tracking
+   *
+   * <p>Gating sequences represent consumer progress and prevent overwriting 
unprocessed events
+   *
+   * @param gatingSequences consumer sequences to track
+   */
+  public void addGatingSequences(Sequence... gatingSequences) {
+    sequencer.addGatingSequences(gatingSequences);
+  }
+
+  /**
+   * Create a sequence barrier for consumers
+   *
+   * <p>Barrier coordinates when events become available for processing
+   *
+   * @param sequencesToTrack upstream sequences to wait for
+   * @return new sequence barrier
+   */
+  public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
+    return sequencer.newBarrier(sequencesToTrack);
+  }
+
+  /**
+   * Get current producer cursor position
+   *
+   * @return current cursor value
+   */
+  public long getCursor() {
+    return sequencer.getCursor().get();
+  }
+
+  /**
+   * Get the buffer size
+   *
+   * @return configured buffer size
+   */
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  /**
+   * Get remaining capacity in the buffer
+   *
+   * @return number of available slots
+   */
+  public long remainingCapacity() {
+    return sequencer.remainingCapacity();
+  }
+
+  /**
+   * Function interface for translating data into events
+   *
+   * @param <E> event type
+   * @param <A> argument type
+   */
+  @FunctionalInterface
+  public interface EventTranslator<E, A> {
+    /**
+     * Translate argument into event
+     *
+     * @param event pre-allocated event to populate
+     * @param sequence sequence number for this event
+     * @param arg source data
+     */
+    void translateTo(E event, long sequence, A arg);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Sequence.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Sequence.java
new file mode 100644
index 00000000000..1f1d3445969
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Sequence.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** Left-hand side padding for cache line alignment */
+class LhsPadding {
+  protected long p1, p2, p3, p4, p5, p6, p7;
+}
+
+/** Value class holding the actual sequence */
+class Value extends LhsPadding {
+  protected AtomicLong value = new AtomicLong();
+}
+
+/** Right-hand side padding for cache line alignment */
+class RhsPadding extends Value {
+  protected long p9, p10, p11, p12, p13, p14, p15;
+}
+
+/**
+ * Lock-free sequence counter with cache line padding
+ *
+ * <p>This implementation is based on LMAX Disruptor 
(https://github.com/LMAX-Exchange/disruptor)
+ * and preserves the core sequence tracking mechanism for IoTDB's Pipe module.
+ *
+ * <p>Key design features:
+ *
+ * <ul>
+ *   <li>Three-level inheritance ensures proper field ordering for padding
+ *   <li>Uses AtomicLong for thread-safe atomic operations
+ *   <li>Cache line padding prevents false sharing between CPU cores
+ *   <li>Supports both ordered writes (cheaper) and volatile writes (stronger)
+ * </ul>
+ */
+public class Sequence extends RhsPadding {
+  public static final long INITIAL_VALUE = -1L;
+
+  /** Create sequence with initial value -1 */
+  public Sequence() {
+    value.set(INITIAL_VALUE);
+  }
+
+  /** Volatile read */
+  public long get() {
+    return value.get();
+  }
+
+  /**
+   * Ordered write (store-store barrier only)
+   *
+   * <p>CRITICAL: Cheaper than volatile write, sufficient for most cases
+   */
+  public void set(final long value) {
+    this.value.set(value);
+  }
+
+  /**
+   * CAS operation - CORE for lock-free design
+   *
+   * @param expectedValue expected current value
+   * @param newValue new value
+   * @return true if successful
+   */
+  public boolean compareAndSet(final long expectedValue, final long newValue) {
+    return value.compareAndSet(expectedValue, newValue);
+  }
+
+  /** Atomically increment */
+  public long incrementAndGet() {
+    return addAndGet(1L);
+  }
+
+  /** Atomically add */
+  public long addAndGet(final long increment) {
+    long currentValue;
+    long newValue;
+
+    do {
+      currentValue = get();
+      newValue = currentValue + increment;
+    } while (!compareAndSet(currentValue, newValue));
+
+    return newValue;
+  }
+
+  @Override
+  public String toString() {
+    return Long.toString(get());
+  }
+
+  /** Get minimum sequence from array - CORE utility method */
+  public static long getMinimumSequence(final Sequence[] sequences, long 
minimum) {
+    for (int i = 0, n = sequences.length; i < n; i++) {
+      long value = sequences[i].get();
+      minimum = Math.min(minimum, value);
+    }
+    return minimum;
+  }
+
+  public static long getMinimumSequence(final Sequence[] sequences) {
+    return getMinimumSequence(sequences, Long.MAX_VALUE);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
new file mode 100644
index 00000000000..80f41162fc7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+/**
+ * Sequence barrier for consumer coordination
+ *
+ * <p>This implementation is based on LMAX Disruptor 
(https://github.com/LMAX-Exchange/disruptor)
+ * and simplified for IoTDB's Pipe module (removed Alert mechanism - IoTDB 
doesn't need it).
+ *
+ * <p>Core features preserved from LMAX Disruptor:
+ *
+ * <ul>
+ *   <li>waitFor() logic for waiting sequences
+ *   <li>Scan available buffer for out-of-order publishing
+ * </ul>
+ */
+public class SequenceBarrier {
+  private final MultiProducerSequencer sequencer;
+  private final Sequence[] dependentSequences;
+
+  public SequenceBarrier(MultiProducerSequencer sequencer, Sequence[] 
dependentSequences) {
+    this.sequencer = sequencer;
+    this.dependentSequences = dependentSequences != null ? dependentSequences 
: new Sequence[0];
+  }
+
+  /**
+   * CORE: Wait for sequence to become available (MUST keep logic)
+   *
+   * @param sequence sequence to wait for
+   * @return highest available sequence
+   * @throws InterruptedException if interrupted
+   */
+  public long waitFor(long sequence) throws InterruptedException {
+    // Wait for cursor
+    long availableSequence;
+    while ((availableSequence = sequencer.getCursor().get()) < sequence) {
+      Thread.sleep(1);
+      if (Thread.currentThread().isInterrupted()) {
+        throw new InterruptedException();
+      }
+    }
+
+    // Wait for dependent sequences
+    if (dependentSequences.length > 0) {
+      while (Sequence.getMinimumSequence(dependentSequences) < sequence) {
+        Thread.sleep(1);
+        if (Thread.currentThread().isInterrupted()) {
+          throw new InterruptedException();
+        }
+      }
+    }
+
+    // CORE: Scan available buffer for highest continuously published sequence
+    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
+  }
+
+  public long getCursor() {
+    return sequencer.getCursor().get();
+  }
+
+  public long getHighestPublishedSequence(long lowerBound, long 
availableSequence) {
+    return sequencer.getHighestPublishedSequence(lowerBound, 
availableSequence);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
new file mode 100644
index 00000000000..af5039070f0
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * Utility for atomic management of sequence arrays
+ *
+ * <p>This implementation is based on LMAX Disruptor 
(https://github.com/LMAX-Exchange/disruptor)
+ * and adapted for IoTDB's Pipe module.
+ *
+ * <p>Provides thread-safe operations for adding and removing sequences from 
gating sequence arrays
+ * used to track consumer progress.
+ */
+final class SequenceGroups {
+
+  /** Field updater for atomic array replacement */
+  private static final AtomicReferenceFieldUpdater<MultiProducerSequencer, 
Sequence[]>
+      SEQUENCE_UPDATER =
+          AtomicReferenceFieldUpdater.newUpdater(
+              MultiProducerSequencer.class, Sequence[].class, 
"gatingSequences");
+
+  /**
+   * Atomically add sequences to the gating sequence array
+   *
+   * <p>Uses CAS loop to ensure thread-safe addition even under concurrent 
modification
+   *
+   * @param sequencer the multi-producer sequencer
+   * @param cursor the current cursor sequence
+   * @param sequencesToAdd sequences to add
+   */
+  static void addSequences(
+      final MultiProducerSequencer sequencer,
+      final Sequence cursor,
+      final Sequence... sequencesToAdd) {
+    long cursorSequence;
+    Sequence[] updatedSequences;
+    Sequence[] currentSequences;
+
+    do {
+      currentSequences = sequencer.gatingSequences;
+      updatedSequences = new Sequence[currentSequences.length + 
sequencesToAdd.length];
+      System.arraycopy(currentSequences, 0, updatedSequences, 0, 
currentSequences.length);
+
+      cursorSequence = cursor.get();
+
+      int index = currentSequences.length;
+      for (Sequence sequence : sequencesToAdd) {
+        sequence.set(cursorSequence);
+        updatedSequences[index++] = sequence;
+      }
+    } while (!SEQUENCE_UPDATER.compareAndSet(sequencer, currentSequences, 
updatedSequences));
+
+    cursorSequence = cursor.get();
+    for (Sequence sequence : sequencesToAdd) {
+      sequence.set(cursorSequence);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index d6cfa6f6abc..aaa98220178 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -68,26 +68,35 @@ public class PipeInsertionDataNodeListener {
   }
 
   public synchronized void stopListenAndAssign(
-      String dataRegionId, PipeRealtimeDataRegionSource extractor) {
-    final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
-    if (assigner == null) {
-      return;
-    }
-
-    assigner.stopAssignTo(extractor);
-
-    if (extractor.isNeedListenToTsFile()) {
-      listenToTsFileExtractorCount.decrementAndGet();
-    }
-    if (extractor.isNeedListenToInsertNode()) {
-      listenToInsertNodeExtractorCount.decrementAndGet();
+      final String dataRegionId, final PipeRealtimeDataRegionSource extractor) 
{
+    PipeDataRegionAssigner assignerToClose = null;
+
+    synchronized (this) {
+      final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
+      if (assigner == null) {
+        return;
+      }
+
+      assigner.stopAssignTo(extractor);
+
+      if (extractor.isNeedListenToTsFile()) {
+        listenToTsFileExtractorCount.decrementAndGet();
+      }
+      if (extractor.isNeedListenToInsertNode()) {
+        listenToInsertNodeExtractorCount.decrementAndGet();
+      }
+
+      if (assigner.notMoreSourceNeededToBeAssigned()) {
+        // The removed assigner will is the same as the one referenced by the 
variable `assigner`
+        dataRegionId2Assigner.remove(dataRegionId);
+        // This will help to release the memory occupied by the assigner
+        assignerToClose = assigner;
+      }
     }
 
-    if (assigner.notMoreExtractorNeededToBeAssigned()) {
-      // The removed assigner will is the same as the one referenced by the 
variable `assigner`
-      dataRegionId2Assigner.remove(dataRegionId);
-      // This will help to release the memory occupied by the assigner
-      assigner.close();
+    if (assignerToClose != null) {
+      // Closing the disruptor may block for a while, so keep it out of the 
global listener lock.
+      assignerToClose.close();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
new file mode 100644
index 00000000000..3fd40c4d4f2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DisruptorShutdownTest {
+
+  @Test
+  public void 
testBatchEventProcessorDrainsPublishedEventsOnShutdownInterrupt() throws 
Exception {
+    final RingBuffer<TestEvent> ringBuffer = 
RingBuffer.createMultiProducer(TestEvent::new, 32);
+    ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 
1);
+
+    final TestSequenceBarrier barrier = new TestSequenceBarrier(0L);
+    final AtomicInteger handledEventCount = new AtomicInteger();
+    final BatchEventProcessor<TestEvent> processor =
+        new BatchEventProcessor<>(
+            ringBuffer,
+            barrier,
+            (event, sequence, endOfBatch) -> 
handledEventCount.incrementAndGet());
+
+    final Thread processorThread = new Thread(processor, 
"pipe-batch-event-processor-test");
+    processorThread.start();
+
+    Assert.assertTrue(barrier.awaitWaitForCall());
+    processor.halt();
+    barrier.interruptWait();
+
+    processorThread.join(TimeUnit.SECONDS.toMillis(5));
+
+    Assert.assertFalse(processorThread.isAlive());
+    Assert.assertEquals(1, handledEventCount.get());
+    Assert.assertEquals(0L, processor.getSequence().get());
+  }
+
+  @Test
+  public void 
testBatchEventProcessorDrainsEventsPublishedAfterCurrentBatchWhenHalting()
+      throws Exception {
+    final RingBuffer<TestEvent> ringBuffer = 
RingBuffer.createMultiProducer(TestEvent::new, 32);
+    ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 
1);
+    ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 
2);
+
+    final SnapshotSequenceBarrier barrier = new SnapshotSequenceBarrier(0L, 
1L);
+    final AtomicInteger handledEventCount = new AtomicInteger();
+    final AtomicReference<BatchEventProcessor<TestEvent>> processorReference =
+        new AtomicReference<>();
+    final BatchEventProcessor<TestEvent> processor =
+        new BatchEventProcessor<>(
+            ringBuffer,
+            barrier,
+            (event, sequence, endOfBatch) -> {
+              handledEventCount.incrementAndGet();
+              if (event.value == 1) {
+                processorReference.get().halt();
+              }
+            });
+    processorReference.set(processor);
+
+    final Thread processorThread =
+        new Thread(processor, "pipe-batch-event-processor-snapshot-test");
+    processorThread.start();
+    processorThread.join(TimeUnit.SECONDS.toMillis(5));
+
+    Assert.assertFalse(processorThread.isAlive());
+    Assert.assertEquals(2, handledEventCount.get());
+    Assert.assertEquals(1L, processor.getSequence().get());
+  }
+
+  @Test
+  public void testDisruptorShutdownInterruptsWaitingProcessor() throws 
Exception {
+    final AtomicReference<Thread> processorThreadReference = new 
AtomicReference<>();
+    final ThreadFactory threadFactory =
+        runnable -> {
+          final Thread thread = new Thread(runnable, 
"pipe-disruptor-shutdown-test");
+          processorThreadReference.set(thread);
+          return thread;
+        };
+
+    final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32, 
threadFactory);
+    disruptor.handleEventsWith((event, sequence, endOfBatch) -> {});
+    disruptor.start();
+
+    final Thread processorThread = processorThreadReference.get();
+    Assert.assertNotNull(processorThread);
+
+    TimeUnit.MILLISECONDS.sleep(50);
+    disruptor.shutdown();
+
+    Assert.assertFalse(processorThread.isAlive());
+  }
+
+  private static class TestEvent {
+    private int value;
+  }
+
+  private static class TestSequenceBarrier extends SequenceBarrier {
+
+    private final long cursor;
+    private final CountDownLatch waitForCalled = new CountDownLatch(1);
+    private final CountDownLatch interruptWait = new CountDownLatch(1);
+
+    private TestSequenceBarrier(final long cursor) {
+      super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
+      this.cursor = cursor;
+    }
+
+    @Override
+    public long waitFor(final long sequence) throws InterruptedException {
+      waitForCalled.countDown();
+      interruptWait.await();
+      throw new InterruptedException();
+    }
+
+    @Override
+    public long getCursor() {
+      return cursor;
+    }
+
+    @Override
+    public long getHighestPublishedSequence(final long lowerBound, final long 
availableSequence) {
+      return availableSequence;
+    }
+
+    private boolean awaitWaitForCall() throws InterruptedException {
+      return waitForCalled.await(5, TimeUnit.SECONDS);
+    }
+
+    private void interruptWait() {
+      interruptWait.countDown();
+    }
+  }
+
+  private static class SnapshotSequenceBarrier extends SequenceBarrier {
+
+    private final long waitForResult;
+    private final long cursor;
+
+    private SnapshotSequenceBarrier(final long waitForResult, final long 
cursor) {
+      super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
+      this.waitForResult = waitForResult;
+      this.cursor = cursor;
+    }
+
+    @Override
+    public long waitFor(final long sequence) {
+      return waitForResult;
+    }
+
+    @Override
+    public long getCursor() {
+      return cursor;
+    }
+
+    @Override
+    public long getHighestPublishedSequence(final long lowerBound, final long 
availableSequence) {
+      return availableSequence;
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 294a1231e35..cbe907c71b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,6 @@
         <commons-pool2.version>2.11.1</commons-pool2.version>
         <commons.collections4.version>4.4</commons.collections4.version>
         <ctest.skip.tests>false</ctest.skip.tests>
-        <disruptor.version>3.4.4</disruptor.version>
         
<drill.freemarker.maven.plugin.version>1.21.1</drill.freemarker.maven.plugin.version>
         <dropwizard.metrics.version>4.2.19</dropwizard.metrics.version>
         <eclipse-collections.version>11.1.0</eclipse-collections.version>
@@ -454,11 +453,6 @@
                 <artifactId>h2-mvstore</artifactId>
                 <version>${h2.version}</version>
             </dependency>
-            <dependency>
-                <groupId>com.lmax</groupId>
-                <artifactId>disruptor</artifactId>
-                <version>${disruptor.version}</version>
-            </dependency>
             <dependency>
                 <groupId>org.apache.httpcomponents</groupId>
                 <artifactId>httpclient</artifactId>

Reply via email to