This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 250d97a32a1 [To dev/1.3] Pipe: Fixed the bug that Disruptor may not
clear the reference & will wait long time after pipe close (#17549) (#17569)
250d97a32a1 is described below
commit 250d97a32a16a8334dbb1012638bab4cafdf453a
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 30 11:19:04 2026 +0800
[To dev/1.3] Pipe: Fixed the bug that Disruptor may not clear the reference
& will wait long time after pipe close (#17549) (#17569)
* Pipe: Implementing DisruptorQueue (#16639)
* Pipe: Fixed the bug that Disruptor may not clear the reference & will
wait long time after pipe close (#17549)
* fix
* fix
* del
---------
Co-authored-by: Zhenyu Luo <[email protected]>
---
LICENSE | 12 +
iotdb-core/datanode/pom.xml | 4 -
.../event/common/heartbeat/PipeHeartbeatEvent.java | 4 +-
.../realtime/assigner/DisruptorQueue.java | 15 +-
.../assigner/DisruptorQueueExceptionHandler.java | 3 +-
.../realtime/assigner/PipeDataRegionAssigner.java | 2 +-
.../realtime/disruptor/BatchEventProcessor.java | 149 +++++++++++
.../dataregion/realtime/disruptor/Disruptor.java | 139 ++++++++++
.../EventFactory.java} | 40 ++-
.../EventHandler.java} | 43 ++-
.../ExceptionHandler.java} | 43 +--
.../realtime/disruptor/MultiProducerSequencer.java | 259 ++++++++++++++++++
.../dataregion/realtime/disruptor/RingBuffer.java | 295 +++++++++++++++++++++
.../dataregion/realtime/disruptor/Sequence.java | 122 +++++++++
.../realtime/disruptor/SequenceBarrier.java | 82 ++++++
.../realtime/disruptor/SequenceGroups.java | 77 ++++++
.../listener/PipeInsertionDataNodeListener.java | 45 ++--
.../realtime/disruptor/DisruptorShutdownTest.java | 183 +++++++++++++
pom.xml | 6 -
19 files changed, 1415 insertions(+), 108 deletions(-)
diff --git a/LICENSE b/LICENSE
index 3f3de45e81a..0a1e5e42a9e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -303,3 +303,15 @@ The following files include code modified from Dropwizard
Metrics project.
Copyright (c) 2010-2013 Coda Hale, Yammer.com, 2014-2021 Dropwizard Team
Project page: https://github.com/dropwizard/metrics
License: https://github.com/dropwizard/metrics/blob/release/4.2.x/LICENSE
+
+--------------------------------------------------------------------------------
+
+The following files include code modified from LMax Disruptor project.
+
+./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/*
+
+LMax Disruptor is open source software licensed under the Apache License 2.0
and supported by the Apache Software Foundation.
+Project page: https://github.com/LMAX-Exchange/disruptor
+License: https://github.com/LMAX-Exchange/disruptor/blob/master/LICENCE.txt
+
+--------------------------------------------------------------------------------
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index d6f8d124345..299e50f1be9 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -308,10 +308,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
- <dependency>
- <groupId>com.lmax</groupId>
- <artifactId>disruptor</artifactId>
- </dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index ad29b285442..567e14f4fe3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -27,10 +27,10 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.event.Event;
-import com.lmax.disruptor.RingBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -183,7 +183,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
/////////////////////////////// Queue size Reporting
///////////////////////////////
- public void recordDisruptorSize(final RingBuffer<?> ringBuffer) {
+ public void recordDisruptorSize(final RingBuffer ringBuffer) {
if (shouldPrintMessage) {
disruptorSize = ringBuffer.getBufferSize() - (int)
ringBuffer.remainingCapacity();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index 2a2fa110749..52ac137ae4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -26,12 +26,10 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.Disruptor;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.EventHandler;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,9 +73,8 @@ public class DisruptorQueue {
32,
Math.toIntExact(
allocatedMemoryBlock.getMemoryUsageInBytes() /
ringBufferEntrySizeInBytes)),
- THREAD_FACTORY,
- ProducerType.MULTI,
- new BlockingWaitStrategy());
+ THREAD_FACTORY);
+
disruptor.handleEventsWith(
(container, sequence, endOfBatch) -> {
final PipeRealtimeEvent realtimeEvent = container.getEvent();
@@ -127,7 +124,7 @@ public class DisruptorQueue {
private static class EventContainer {
- private PipeRealtimeEvent event;
+ private volatile PipeRealtimeEvent event;
private EventContainer() {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
index 91ad0224fc5..5330f3486f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
@@ -19,7 +19,8 @@
package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
-import com.lmax.disruptor.ExceptionHandler;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.ExceptionHandler;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index e3067a13126..0b4eb547144 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -202,7 +202,7 @@ public class PipeDataRegionAssigner implements Closeable {
matcher.deregister(extractor);
}
- public boolean notMoreExtractorNeededToBeAssigned() {
+ public boolean notMoreSourceNeededToBeAssigned() {
return matcher.getRegisterCount() == 0;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
new file mode 100644
index 00000000000..d0432821cf7
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Batch event processor for consuming events
+ *
+ * <p>This implementation is based on LMAX Disruptor
(https://github.com/LMAX-Exchange/disruptor)
+ * and simplified for IoTDB's Pipe module (removed complex lifecycle
management).
+ *
+ * <p>Core algorithm preserved from LMAX Disruptor:
+ *
+ * <ul>
+ * <li>Batch processing loop
+ * <li>Sequence tracking
+ * <li>endOfBatch detection
+ * </ul>
+ *
+ * @param <T> event type
+ */
+public final class BatchEventProcessor<T> implements Runnable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BatchEventProcessor.class);
+
+ private final RingBuffer<T> ringBuffer;
+ private final SequenceBarrier sequenceBarrier;
+ private final EventHandler<? super T> eventHandler;
+ private final Sequence sequence = new Sequence();
+ private ExceptionHandler<? super T> exceptionHandler = new
DefaultExceptionHandler<>();
+ private volatile boolean running = true;
+
+ public BatchEventProcessor(
+ RingBuffer<T> ringBuffer, SequenceBarrier barrier, EventHandler<? super
T> eventHandler) {
+ this.ringBuffer = ringBuffer;
+ this.sequenceBarrier = barrier;
+ this.eventHandler = eventHandler;
+ }
+
+ public Sequence getSequence() {
+ return sequence;
+ }
+
+ public void setExceptionHandler(ExceptionHandler<? super T>
exceptionHandler) {
+ this.exceptionHandler = exceptionHandler;
+ }
+
+ public void halt() {
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ long nextSequence = sequence.get() + 1L;
+
+ while (running) {
+ try {
+ // Wait for available sequence
+ final long availableSequence = sequenceBarrier.waitFor(nextSequence);
+
+ // Batch process all available events
+ nextSequence = processAvailableEvents(nextSequence, availableSequence);
+
+ } catch (final InterruptedException ex) {
+ if (running) {
+ Thread.currentThread().interrupt();
+ LOGGER.info("Processor interrupted");
+ }
+ break;
+ } catch (final Throwable ex) {
+ exceptionHandler.handleEventException(ex, nextSequence,
ringBuffer.get(nextSequence));
+ sequence.set(nextSequence);
+ nextSequence++;
+ }
+ }
+
+ if (!running) {
+ drainRemainingPublishedEvents(nextSequence);
+ }
+ LOGGER.info("Processor stopped");
+ }
+
+ private long processAvailableEvents(long nextSequence, long
availableSequence) throws Throwable {
+ while (nextSequence <= availableSequence) {
+ final T event = ringBuffer.get(nextSequence);
+ eventHandler.onEvent(event, nextSequence, nextSequence ==
availableSequence);
+ nextSequence++;
+ }
+
+ sequence.set(availableSequence);
+ return nextSequence;
+ }
+
+ private void drainRemainingPublishedEvents(long nextSequence) {
+ final long availableSequence = sequenceBarrier.getCursor();
+ if (availableSequence < nextSequence) {
+ return;
+ }
+
+ final long highestPublishedSequence =
+ sequenceBarrier.getHighestPublishedSequence(nextSequence,
availableSequence);
+ while (nextSequence <= highestPublishedSequence) {
+ final T event = ringBuffer.get(nextSequence);
+ try {
+ eventHandler.onEvent(event, nextSequence, nextSequence ==
highestPublishedSequence);
+ } catch (final Throwable ex) {
+ exceptionHandler.handleEventException(ex, nextSequence, event);
+ } finally {
+ sequence.set(nextSequence);
+ }
+ nextSequence++;
+ }
+ }
+
+ private static class DefaultExceptionHandler<T> implements
ExceptionHandler<T> {
+ @Override
+ public void handleEventException(Throwable ex, long sequence, T event) {
+ LoggerFactory.getLogger(getClass()).error("Exception processing: {} {}",
sequence, event, ex);
+ }
+
+ @Override
+ public void handleOnStartException(Throwable ex) {
+ LoggerFactory.getLogger(getClass()).error("Exception during onStart()",
ex);
+ }
+
+ @Override
+ public void handleOnShutdownException(Throwable ex) {
+ LoggerFactory.getLogger(getClass()).error("Exception during
onShutdown()", ex);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
new file mode 100644
index 00000000000..f3a64701285
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Simplified Disruptor implementation for IoTDB Pipe
+ *
+ * <p>This implementation is based on LMAX Disruptor
(https://github.com/LMAX-Exchange/disruptor)
+ * and simplified for IoTDB's specific use case in the Pipe module.
+ *
+ * <p>Key simplifications:
+ *
+ * <ul>
+ * <li>Single event handler support (no complex dependency graphs)
+ * <li>Simplified lifecycle management
+ * <li>Removed wait strategies (using simple sleep-based waiting)
+ * </ul>
+ *
+ * @param <T> event type
+ */
+public class Disruptor<T> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Disruptor.class);
+
+ private final RingBuffer<T> ringBuffer;
+ private final ThreadFactory threadFactory;
+ private BatchEventProcessor<T> processor;
+ private Thread processorThread;
+ private ExceptionHandler<? super T> exceptionHandler;
+ private volatile boolean started = false;
+
+ /**
+ * Create a Disruptor instance
+ *
+ * @param eventFactory factory for creating pre-allocated events
+ * @param ringBufferSize buffer size (must be power of 2)
+ * @param threadFactory factory for creating consumer thread
+ */
+ public Disruptor(EventFactory<T> eventFactory, int ringBufferSize,
ThreadFactory threadFactory) {
+ this.ringBuffer = RingBuffer.createMultiProducer(eventFactory,
ringBufferSize);
+ this.threadFactory = threadFactory;
+ }
+
+ /**
+ * Configure event handler for processing events
+ *
+ * <p>Creates a batch event processor that will run in its own thread
+ *
+ * @param handler event handler implementation
+ * @return this instance for method chaining
+ */
+ public Disruptor<T> handleEventsWith(final EventHandler<? super T> handler) {
+ SequenceBarrier barrier = ringBuffer.newBarrier();
+ processor = new BatchEventProcessor<>(ringBuffer, barrier, handler);
+
+ if (exceptionHandler != null) {
+ processor.setExceptionHandler(exceptionHandler);
+ }
+
+ ringBuffer.addGatingSequences(processor.getSequence());
+ return this;
+ }
+
+ /**
+ * Set exception handler for error handling
+ *
+ * @param exceptionHandler handler for processing exceptions
+ */
+ public void setDefaultExceptionHandler(ExceptionHandler<? super T>
exceptionHandler) {
+ this.exceptionHandler = exceptionHandler;
+ if (processor != null) {
+ processor.setExceptionHandler(exceptionHandler);
+ }
+ }
+
+ public RingBuffer<T> start() {
+ if (started) {
+ throw new IllegalStateException("Disruptor already started");
+ }
+
+ if (processor == null) {
+ throw new IllegalStateException("No event handler configured");
+ }
+
+ processorThread = threadFactory.newThread(processor);
+ processorThread.start();
+ started = true;
+
+ LOGGER.info("Disruptor started with buffer size: {}",
ringBuffer.getBufferSize());
+ return ringBuffer;
+ }
+
+ public void shutdown() {
+ if (!started) {
+ return;
+ }
+
+ if (processor != null) {
+ processor.halt();
+ }
+
+ if (processorThread != null) {
+ try {
+ processorThread.interrupt();
+ processorThread.join(5000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted waiting for processor to stop");
+ }
+ if (processorThread.isAlive()) {
+ LOGGER.warn("Timed out waiting for processor to stop");
+ }
+ }
+
+ started = false;
+ LOGGER.info("Disruptor shutdown completed");
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventFactory.java
similarity index 50%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventFactory.java
index 91ad0224fc5..785033c6824 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventFactory.java
@@ -17,28 +17,22 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
-import com.lmax.disruptor.ExceptionHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DisruptorQueueExceptionHandler implements
ExceptionHandler<Object> {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(DisruptorQueueExceptionHandler.class);
-
- @Override
- public void handleEventException(final Throwable ex, final long sequence,
final Object event) {
- LOGGER.error("Exception processing: {} {}", sequence, event, ex);
- }
-
- @Override
- public void handleOnStartException(final Throwable ex) {
- LOGGER.warn("Exception during onStart()", ex);
- }
-
- @Override
- public void handleOnShutdownException(final Throwable ex) {
- LOGGER.warn("Exception during onShutdown()", ex);
- }
+/**
+ * Event factory for pre-allocating events in RingBuffer
+ *
+ * <p>This interface is based on LMAX Disruptor
(https://github.com/LMAX-Exchange/disruptor) and
+ * adapted for IoTDB's Pipe module.
+ *
+ * @param <T> event type
+ */
+@FunctionalInterface
+public interface EventFactory<T> {
+ /**
+ * Create new event instance
+ *
+ * @return new event
+ */
+ T newInstance();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventHandler.java
similarity index 50%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventHandler.java
index 91ad0224fc5..1fc81a37623 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventHandler.java
@@ -17,28 +17,25 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
-import com.lmax.disruptor.ExceptionHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DisruptorQueueExceptionHandler implements
ExceptionHandler<Object> {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(DisruptorQueueExceptionHandler.class);
-
- @Override
- public void handleEventException(final Throwable ex, final long sequence,
final Object event) {
- LOGGER.error("Exception processing: {} {}", sequence, event, ex);
- }
-
- @Override
- public void handleOnStartException(final Throwable ex) {
- LOGGER.warn("Exception during onStart()", ex);
- }
-
- @Override
- public void handleOnShutdownException(final Throwable ex) {
- LOGGER.warn("Exception during onShutdown()", ex);
- }
+/**
+ * Event handler for processing events from RingBuffer
+ *
+ * <p>This interface is based on LMAX Disruptor
(https://github.com/LMAX-Exchange/disruptor) and
+ * adapted for IoTDB's Pipe module.
+ *
+ * @param <T> event type
+ */
+@FunctionalInterface
+public interface EventHandler<T> {
+ /**
+ * Handle event
+ *
+ * @param event the event
+ * @param sequence sequence number
+ * @param endOfBatch whether this is the last event in current batch
+ * @throws Exception if processing fails
+ */
+ void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/ExceptionHandler.java
similarity index 50%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/ExceptionHandler.java
index 91ad0224fc5..28396b51ffe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/ExceptionHandler.java
@@ -17,28 +17,29 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
-import com.lmax.disruptor.ExceptionHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DisruptorQueueExceptionHandler implements
ExceptionHandler<Object> {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(DisruptorQueueExceptionHandler.class);
-
- @Override
- public void handleEventException(final Throwable ex, final long sequence,
final Object event) {
- LOGGER.error("Exception processing: {} {}", sequence, event, ex);
- }
+/**
+ * Exception handler for event processing errors
+ *
+ * <p>This interface is based on LMAX Disruptor
(https://github.com/LMAX-Exchange/disruptor) and
+ * adapted for IoTDB's Pipe module.
+ *
+ * @param <T> event type
+ */
+public interface ExceptionHandler<T> {
+ /**
+ * Handle exception during event processing
+ *
+ * @param ex exception
+ * @param sequence sequence number
+ * @param event the event
+ */
+ void handleEventException(Throwable ex, long sequence, T event);
- @Override
- public void handleOnStartException(final Throwable ex) {
- LOGGER.warn("Exception during onStart()", ex);
- }
+ /** Handle exception during processor start */
+ void handleOnStartException(Throwable ex);
- @Override
- public void handleOnShutdownException(final Throwable ex) {
- LOGGER.warn("Exception during onShutdown()", ex);
- }
+ /** Handle exception during processor shutdown */
+ void handleOnShutdownException(Throwable ex);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
new file mode 100644
index 00000000000..d40ed968398
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * Multi-producer sequencer for coordinating concurrent publishers
+ *
+ * <p>This implementation is based on LMAX Disruptor
(https://github.com/LMAX-Exchange/disruptor)
+ * and preserves the core lock-free multi-producer algorithm for IoTDB's Pipe
module.
+ *
+ * <p>Key features preserved from LMAX Disruptor:
+ *
+ * <ul>
+ * <li>Lock-free CAS-based sequence claiming
+ * <li>Availability buffer for out-of-order publishing detection
+ * <li>Backpressure via gating sequences
+ * <li>Cache line padding to prevent false sharing
+ * </ul>
+ */
+public final class MultiProducerSequencer {
+
+ /** Ring buffer size (must be power of 2) - immutable after construction */
+ private final int bufferSize;
+
+ /**
+ * Producer cursor tracking highest claimed sequence Updated via CAS in
next() method Volatile
+ * reads/writes handled by Sequence class
+ */
+ private final Sequence cursor = new Sequence();
+
+ /**
+ * Array of consumer sequences for backpressure control MUST be volatile for
safe publication when
+ * modified by SequenceGroups Array reference is replaced atomically via
+ * AtomicReferenceFieldUpdater
+ */
+ volatile Sequence[] gatingSequences;
+
+ /**
+ * Cached minimum gating sequence to reduce contention Updated
opportunistically in next() to
+ * avoid expensive array scan Does not need to be perfectly accurate
(conservative is safe)
+ */
+ private final Sequence gatingSequenceCache = new Sequence();
+
+ /**
+ * CRITICAL: Availability flags for tracking published sequences
+ *
+ * <p>Handles out-of-order publishing in multi-producer scenario: - Thread A
claims seq 10, still
+ * writing - Thread B claims seq 11, finishes and publishes - Consumer MUST
wait for seq 10 before
+ * reading seq 11
+ *
+ * <p>Memory visibility guarantees: - Writers use lazySet() for store-store
barrier (cheaper than
+ * volatile write) - Readers use get() for volatile read (ensures visibility
across threads)
+ *
+ * <p>AtomicIntegerArray provides same semantics as Unsafe without reflection
+ */
+ private final AtomicIntegerArray availableBuffer;
+
+ /** Mask for fast modulo: sequence & indexMask == sequence % bufferSize */
+ private final int indexMask;
+
+ /** Shift for calculating wrap count: sequence >>> indexShift */
+ private final int indexShift;
+
+ public MultiProducerSequencer(int bufferSize, Sequence[] gatingSequences) {
+ if (bufferSize < 1) {
+ throw new IllegalArgumentException("bufferSize must not be less than 1");
+ }
+ if (Integer.bitCount(bufferSize) != 1) {
+ throw new IllegalArgumentException("bufferSize must be a power of 2");
+ }
+
+ this.bufferSize = bufferSize;
+ this.gatingSequences = gatingSequences != null ? gatingSequences : new
Sequence[0];
+ this.availableBuffer = new AtomicIntegerArray(bufferSize);
+ this.indexMask = bufferSize - 1;
+ this.indexShift = log2(bufferSize);
+
+ initialiseAvailableBuffer();
+ }
+
+ /**
+ * Claim next n sequences for publishing
+ *
+ * <p>Uses CAS loop to atomically claim sequence numbers. Implements
backpressure by parking when
+ * buffer is full.
+ *
+ * @param n number of sequences to claim
+ * @return highest claimed sequence number
+ */
+ public long next(int n) {
+ if (n < 1) {
+ throw new IllegalArgumentException("n must be > 0");
+ }
+
+ long current;
+ long next;
+
+ do {
+ current = cursor.get();
+ next = current + n;
+
+ final long wrapPoint = next - bufferSize;
+ final long cachedGatingSequence = gatingSequenceCache.get();
+
+ if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
+ long gatingSequence = Sequence.getMinimumSequence(gatingSequences,
current);
+
+ if (wrapPoint > gatingSequence) {
+ LockSupport.parkNanos(1);
+ continue;
+ }
+
+ gatingSequenceCache.set(gatingSequence);
+ } else if (cursor.compareAndSet(current, next)) {
+ break;
+ }
+ } while (true);
+
+ return next;
+ }
+
+ /** Publish sequence */
+ public void publish(final long sequence) {
+ setAvailable(sequence);
+ }
+
+ /** Publish batch */
+ public void publish(long lo, long hi) {
+ for (long l = lo; l <= hi; l++) {
+ setAvailable(l);
+ }
+ }
+
+ /**
+ * CORE: Check if sequence is available for consumption Uses volatile read
to ensure visibility of
+ * published sequences
+ */
+ public boolean isAvailable(long sequence) {
+ int index = calculateIndex(sequence);
+ int flag = calculateAvailabilityFlag(sequence);
+ return availableBuffer.get(index) == flag;
+ }
+
+ /** CORE: Get highest published - exact same algorithm */
+ public long getHighestPublishedSequence(long lowerBound, long
availableSequence) {
+ for (long sequence = lowerBound; sequence <= availableSequence;
sequence++) {
+ if (!isAvailable(sequence)) {
+ return sequence - 1;
+ }
+ }
+ return availableSequence;
+ }
+
+ public Sequence getCursor() {
+ return cursor;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public long remainingCapacity() {
+ long consumed = Sequence.getMinimumSequence(gatingSequences, cursor.get());
+ long produced = cursor.get();
+ return bufferSize - (produced - consumed);
+ }
+
+ /**
+ * Add gating sequences for consumer tracking
+ *
+ * <p>Atomically adds sequences to track consumer progress
+ *
+ * @param gatingSequences consumer sequences to add
+ */
+ public void addGatingSequences(Sequence... gatingSequences) {
+ SequenceGroups.addSequences(this, this.cursor, gatingSequences);
+ }
+
+ /**
+ * Create a sequence barrier for consumers
+ *
+ * @param sequencesToTrack upstream sequences to wait for
+ * @return new sequence barrier
+ */
+ public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
+ return new SequenceBarrier(this, sequencesToTrack);
+ }
+
+ /** Initialize available buffer */
+ private void initialiseAvailableBuffer() {
+ for (int i = availableBuffer.length() - 1; i != 0; i--) {
+ setAvailableBufferValue(i, -1);
+ }
+ setAvailableBufferValue(0, -1);
+ }
+
+ /**
+ * CORE: Mark sequence as available for consumption
+ *
+ * <p>Uses lazySet() which provides: - Store-store barrier (ensures all
prior writes are visible)
+ * - Cheaper than full volatile write (no store-load barrier) - Sufficient
for this use case
+ * (readers use volatile get)
+ */
+ private void setAvailable(final long sequence) {
+ setAvailableBufferValue(calculateIndex(sequence),
calculateAvailabilityFlag(sequence));
+ }
+
+ /**
+ * Set availability flag with release semantics lazySet() ensures previous
event writes are
+ * visible before flag update
+ */
+ private void setAvailableBufferValue(int index, int flag) {
+ availableBuffer.lazySet(index, flag);
+ }
+
+ /** Calculate availability flag */
+ private int calculateAvailabilityFlag(final long sequence) {
+ return (int) (sequence >>> indexShift);
+ }
+
+ /** Calculate index */
+ private int calculateIndex(final long sequence) {
+ return ((int) sequence) & indexMask;
+ }
+
+ /**
+ * Calculate log2 for index shift calculation
+ *
+ * @param i input value (must be power of 2)
+ * @return log2 of input
+ */
+ private static int log2(int i) {
+ int r = 0;
+ while ((i >>= 1) != 0) {
+ ++r;
+ }
+ return r;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
new file mode 100644
index 00000000000..2af784b603d
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+/**
+ * Left-hand side padding for cache line alignment
+ *
+ * <p>Prevents false sharing by ensuring RingBuffer fields don't share cache
lines with preceding
+ * objects
+ */
+abstract class RingBufferPad {
+ protected long p1, p2, p3, p4, p5, p6, p7;
+}
+
+/**
+ * Core fields for RingBuffer implementation
+ *
+ * <p>Contains the actual event storage array and sequencing state
+ */
+abstract class RingBufferFields<E> extends RingBufferPad {
+ /** Pre-allocated event storage with padding to prevent false sharing */
+ private final Object[] entries;
+
+ /** Total number of events in the buffer (must be power of 2) */
+ protected final int bufferSize;
+
+ /** Mask for fast modulo operation (bufferSize - 1) */
+ protected final int indexMask;
+
+ /** Sequencer for managing producer/consumer coordination */
+ protected final MultiProducerSequencer sequencer;
+
+ /**
+ * Initialize ring buffer fields
+ *
+ * @param eventFactory factory for pre-allocating events
+ * @param sequencer multi-producer sequencer
+ */
+ RingBufferFields(EventFactory<E> eventFactory, MultiProducerSequencer
sequencer) {
+ this.sequencer = sequencer;
+ this.bufferSize = sequencer.getBufferSize();
+
+ if (bufferSize < 1) {
+ throw new IllegalArgumentException("bufferSize must not be less than 1");
+ }
+ if (Integer.bitCount(bufferSize) != 1) {
+ throw new IllegalArgumentException("bufferSize must be a power of 2");
+ }
+
+ this.indexMask = bufferSize - 1;
+ // Allocate array with padding on both sides to prevent false sharing
+ this.entries = new Object[bufferSize];
+ fill(eventFactory);
+ }
+
+ /**
+ * Pre-allocate all events in the buffer
+ *
+ * @param eventFactory factory for creating event instances
+ */
+ private void fill(EventFactory<E> eventFactory) {
+ for (int i = 0; i < bufferSize; i++) {
+ // Store events starting after front padding
+ entries[i] = eventFactory.newInstance();
+ }
+ }
+
+ /**
+ * Get event at sequence using direct memory access
+ *
+ * @param sequence sequence number
+ * @return event at the sequence position
+ */
+ @SuppressWarnings("unchecked")
+ protected final E elementAt(long sequence) {
+ // Use Unsafe for lock-free array access with proper memory barriers
+ return (E) entries[(int) (sequence & indexMask)];
+ }
+}
+
+/**
+ * Lock-free ring buffer for storing pre-allocated event objects
+ *
+ * <p>This implementation is based on LMAX Disruptor
(https://github.com/LMAX-Exchange/disruptor)
+ * and preserves the core ring buffer algorithm for IoTDB's Pipe module.
+ *
+ * <p>Supports multi-producer concurrent access with zero-garbage design.
Events are pre-allocated
+ * and reused, avoiding GC pressure. Uses cache line padding to prevent false
sharing.
+ *
+ * @param <E> event type
+ */
+public final class RingBuffer<E> extends RingBufferFields<E> {
+ /** Initial cursor value for the ring buffer */
+ public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
+
+ /**
+ * Right-hand side padding for cache line alignment
+ *
+ * <p>Prevents false sharing by ensuring RingBuffer fields don't share cache
lines with following
+ * objects
+ */
+ protected long p1, p2, p3, p4, p5, p6, p7;
+
+ /**
+ * Construct a RingBuffer with given factory and sequencer
+ *
+ * @param eventFactory factory to create and pre-allocate events
+ * @param sequencer multi-producer sequencer for sequence management
+ */
+ private RingBuffer(EventFactory<E> eventFactory, MultiProducerSequencer
sequencer) {
+ super(eventFactory, sequencer);
+ }
+
+ /**
+ * Create a multi-producer RingBuffer
+ *
+ * <p>Supports concurrent publishing from multiple threads using lock-free
CAS operations
+ *
+ * @param factory event factory for creating event instances
+ * @param bufferSize buffer size (must be power of 2)
+ * @param <E> event type
+ * @return newly created ring buffer
+ */
+ public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory,
int bufferSize) {
+ MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize,
new Sequence[0]);
+ return new RingBuffer<>(factory, sequencer);
+ }
+
+ /**
+ * Get the event at a specific sequence
+ *
+ * @param sequence sequence number to retrieve
+ * @return event at the given sequence
+ */
+ public E get(long sequence) {
+ return elementAt(sequence);
+ }
+
+ /**
+ * Claim the next sequence for publishing
+ *
+ * <p>Blocks if buffer is full until space becomes available
+ *
+ * @return claimed sequence number
+ */
+ public long next() {
+ return sequencer.next(1);
+ }
+
+ /**
+ * Claim next n sequences for batch publishing
+ *
+ * @param n number of sequences to claim
+ * @return highest claimed sequence number
+ */
+ public long next(int n) {
+ return sequencer.next(n);
+ }
+
+ /**
+ * Publish a single sequence
+ *
+ * <p>Makes the event at this sequence visible to consumers
+ *
+ * @param sequence sequence to publish
+ */
+ public void publish(long sequence) {
+ sequencer.publish(sequence);
+ }
+
+ /**
+ * Publish a batch of sequences
+ *
+ * @param lo lowest sequence in the batch (inclusive)
+ * @param hi highest sequence in the batch (inclusive)
+ */
+ public void publish(long lo, long hi) {
+ sequencer.publish(lo, hi);
+ }
+
+ /**
+ * Publish event using a translator function
+ *
+ * <p>Provides a higher-level API for publishing events with custom
translation logic
+ *
+ * @param translator function to populate the event
+ * @param arg0 argument passed to translator
+ * @param <A> argument type
+ */
+ public <A> void publishEvent(EventTranslator<E, A> translator, A arg0) {
+ final long sequence = sequencer.next(1);
+ translateAndPublish(translator, sequence, arg0);
+ }
+
+ /**
+ * Translate event and publish atomically
+ *
+ * @param translator event translator function
+ * @param sequence claimed sequence number
+ * @param arg0 argument for translation
+ * @param <A> argument type
+ */
+ private <A> void translateAndPublish(EventTranslator<E, A> translator, long
sequence, A arg0) {
+ try {
+ translator.translateTo(get(sequence), sequence, arg0);
+ } finally {
+ sequencer.publish(sequence);
+ }
+ }
+
+ /**
+ * Add gating sequences for consumer tracking
+ *
+ * <p>Gating sequences represent consumer progress and prevent overwriting
unprocessed events
+ *
+ * @param gatingSequences consumer sequences to track
+ */
+ public void addGatingSequences(Sequence... gatingSequences) {
+ sequencer.addGatingSequences(gatingSequences);
+ }
+
+ /**
+ * Create a sequence barrier for consumers
+ *
+ * <p>Barrier coordinates when events become available for processing
+ *
+ * @param sequencesToTrack upstream sequences to wait for
+ * @return new sequence barrier
+ */
+ public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
+ return sequencer.newBarrier(sequencesToTrack);
+ }
+
+ /**
+ * Get current producer cursor position
+ *
+ * @return current cursor value
+ */
+ public long getCursor() {
+ return sequencer.getCursor().get();
+ }
+
+ /**
+ * Get the buffer size
+ *
+ * @return configured buffer size
+ */
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ /**
+ * Get remaining capacity in the buffer
+ *
+ * @return number of available slots
+ */
+ public long remainingCapacity() {
+ return sequencer.remainingCapacity();
+ }
+
+ /**
+ * Function interface for translating data into events
+ *
+ * @param <E> event type
+ * @param <A> argument type
+ */
+ @FunctionalInterface
+ public interface EventTranslator<E, A> {
+ /**
+ * Translate argument into event
+ *
+ * @param event pre-allocated event to populate
+ * @param sequence sequence number for this event
+ * @param arg source data
+ */
+ void translateTo(E event, long sequence, A arg);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Sequence.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Sequence.java
new file mode 100644
index 00000000000..1f1d3445969
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Sequence.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** Left-hand side padding for cache line alignment */
+class LhsPadding {
+ protected long p1, p2, p3, p4, p5, p6, p7;
+}
+
+/** Value class holding the actual sequence */
+class Value extends LhsPadding {
+ protected AtomicLong value = new AtomicLong();
+}
+
+/** Right-hand side padding for cache line alignment */
+class RhsPadding extends Value {
+ protected long p9, p10, p11, p12, p13, p14, p15;
+}
+
+/**
+ * Lock-free sequence counter with cache line padding
+ *
+ * <p>This implementation is based on LMAX Disruptor
(https://github.com/LMAX-Exchange/disruptor)
+ * and preserves the core sequence tracking mechanism for IoTDB's Pipe module.
+ *
+ * <p>Key design features:
+ *
+ * <ul>
+ * <li>Three-level inheritance ensures proper field ordering for padding
+ * <li>Uses AtomicLong for thread-safe atomic operations
+ * <li>Cache line padding prevents false sharing between CPU cores
+ * <li>Supports both ordered writes (cheaper) and volatile writes (stronger)
+ * </ul>
+ */
+public class Sequence extends RhsPadding {
+ public static final long INITIAL_VALUE = -1L;
+
+ /** Create sequence with initial value -1 */
+ public Sequence() {
+ value.set(INITIAL_VALUE);
+ }
+
+ /** Volatile read */
+ public long get() {
+ return value.get();
+ }
+
+ /**
+ * Ordered write (store-store barrier only)
+ *
+ * <p>CRITICAL: Cheaper than volatile write, sufficient for most cases
+ */
+ public void set(final long value) {
+ this.value.set(value);
+ }
+
+ /**
+ * CAS operation - CORE for lock-free design
+ *
+ * @param expectedValue expected current value
+ * @param newValue new value
+ * @return true if successful
+ */
+ public boolean compareAndSet(final long expectedValue, final long newValue) {
+ return value.compareAndSet(expectedValue, newValue);
+ }
+
+ /** Atomically increment */
+ public long incrementAndGet() {
+ return addAndGet(1L);
+ }
+
+ /** Atomically add */
+ public long addAndGet(final long increment) {
+ long currentValue;
+ long newValue;
+
+ do {
+ currentValue = get();
+ newValue = currentValue + increment;
+ } while (!compareAndSet(currentValue, newValue));
+
+ return newValue;
+ }
+
+ @Override
+ public String toString() {
+ return Long.toString(get());
+ }
+
+ /** Get minimum sequence from array - CORE utility method */
+ public static long getMinimumSequence(final Sequence[] sequences, long
minimum) {
+ for (int i = 0, n = sequences.length; i < n; i++) {
+ long value = sequences[i].get();
+ minimum = Math.min(minimum, value);
+ }
+ return minimum;
+ }
+
+ public static long getMinimumSequence(final Sequence[] sequences) {
+ return getMinimumSequence(sequences, Long.MAX_VALUE);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
new file mode 100644
index 00000000000..80f41162fc7
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+/**
+ * Sequence barrier for consumer coordination
+ *
+ * <p>This implementation is based on LMAX Disruptor
(https://github.com/LMAX-Exchange/disruptor)
+ * and simplified for IoTDB's Pipe module (removed Alert mechanism - IoTDB
doesn't need it).
+ *
+ * <p>Core features preserved from LMAX Disruptor:
+ *
+ * <ul>
+ * <li>waitFor() logic for waiting sequences
+ * <li>Scan available buffer for out-of-order publishing
+ * </ul>
+ */
+public class SequenceBarrier {
+ private final MultiProducerSequencer sequencer;
+ private final Sequence[] dependentSequences;
+
+ public SequenceBarrier(MultiProducerSequencer sequencer, Sequence[]
dependentSequences) {
+ this.sequencer = sequencer;
+ this.dependentSequences = dependentSequences != null ? dependentSequences
: new Sequence[0];
+ }
+
+ /**
+ * CORE: Wait for sequence to become available (MUST keep logic)
+ *
+ * @param sequence sequence to wait for
+ * @return highest available sequence
+ * @throws InterruptedException if interrupted
+ */
+ public long waitFor(long sequence) throws InterruptedException {
+ // Wait for cursor
+ long availableSequence;
+ while ((availableSequence = sequencer.getCursor().get()) < sequence) {
+ Thread.sleep(1);
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+ }
+
+ // Wait for dependent sequences
+ if (dependentSequences.length > 0) {
+ while (Sequence.getMinimumSequence(dependentSequences) < sequence) {
+ Thread.sleep(1);
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+ }
+ }
+
+ // CORE: Scan available buffer for highest continuously published sequence
+ return sequencer.getHighestPublishedSequence(sequence, availableSequence);
+ }
+
+ public long getCursor() {
+ return sequencer.getCursor().get();
+ }
+
+ public long getHighestPublishedSequence(long lowerBound, long
availableSequence) {
+ return sequencer.getHighestPublishedSequence(lowerBound,
availableSequence);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
new file mode 100644
index 00000000000..af5039070f0
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * Utility for atomic management of sequence arrays
+ *
+ * <p>This implementation is based on LMAX Disruptor
(https://github.com/LMAX-Exchange/disruptor)
+ * and adapted for IoTDB's Pipe module.
+ *
+ * <p>Provides thread-safe operations for adding and removing sequences from
gating sequence arrays
+ * used to track consumer progress.
+ */
+final class SequenceGroups {
+
+ /** Field updater for atomic array replacement */
+ private static final AtomicReferenceFieldUpdater<MultiProducerSequencer,
Sequence[]>
+ SEQUENCE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(
+ MultiProducerSequencer.class, Sequence[].class,
"gatingSequences");
+
+ /**
+ * Atomically add sequences to the gating sequence array
+ *
+ * <p>Uses CAS loop to ensure thread-safe addition even under concurrent
modification
+ *
+ * @param sequencer the multi-producer sequencer
+ * @param cursor the current cursor sequence
+ * @param sequencesToAdd sequences to add
+ */
+ static void addSequences(
+ final MultiProducerSequencer sequencer,
+ final Sequence cursor,
+ final Sequence... sequencesToAdd) {
+ long cursorSequence;
+ Sequence[] updatedSequences;
+ Sequence[] currentSequences;
+
+ do {
+ currentSequences = sequencer.gatingSequences;
+ updatedSequences = new Sequence[currentSequences.length +
sequencesToAdd.length];
+ System.arraycopy(currentSequences, 0, updatedSequences, 0,
currentSequences.length);
+
+ cursorSequence = cursor.get();
+
+ int index = currentSequences.length;
+ for (Sequence sequence : sequencesToAdd) {
+ sequence.set(cursorSequence);
+ updatedSequences[index++] = sequence;
+ }
+ } while (!SEQUENCE_UPDATER.compareAndSet(sequencer, currentSequences,
updatedSequences));
+
+ cursorSequence = cursor.get();
+ for (Sequence sequence : sequencesToAdd) {
+ sequence.set(cursorSequence);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index d6cfa6f6abc..aaa98220178 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -68,26 +68,35 @@ public class PipeInsertionDataNodeListener {
}
public synchronized void stopListenAndAssign(
- String dataRegionId, PipeRealtimeDataRegionSource extractor) {
- final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
- if (assigner == null) {
- return;
- }
-
- assigner.stopAssignTo(extractor);
-
- if (extractor.isNeedListenToTsFile()) {
- listenToTsFileExtractorCount.decrementAndGet();
- }
- if (extractor.isNeedListenToInsertNode()) {
- listenToInsertNodeExtractorCount.decrementAndGet();
+ final String dataRegionId, final PipeRealtimeDataRegionSource extractor)
{
+ PipeDataRegionAssigner assignerToClose = null;
+
+ synchronized (this) {
+ final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
+ if (assigner == null) {
+ return;
+ }
+
+ assigner.stopAssignTo(extractor);
+
+ if (extractor.isNeedListenToTsFile()) {
+ listenToTsFileExtractorCount.decrementAndGet();
+ }
+ if (extractor.isNeedListenToInsertNode()) {
+ listenToInsertNodeExtractorCount.decrementAndGet();
+ }
+
+ if (assigner.notMoreSourceNeededToBeAssigned()) {
+ // The removed assigner will is the same as the one referenced by the
variable `assigner`
+ dataRegionId2Assigner.remove(dataRegionId);
+ // This will help to release the memory occupied by the assigner
+ assignerToClose = assigner;
+ }
}
- if (assigner.notMoreExtractorNeededToBeAssigned()) {
- // The removed assigner will is the same as the one referenced by the
variable `assigner`
- dataRegionId2Assigner.remove(dataRegionId);
- // This will help to release the memory occupied by the assigner
- assigner.close();
+ if (assignerToClose != null) {
+ // Closing the disruptor may block for a while, so keep it out of the
global listener lock.
+ assignerToClose.close();
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
new file mode 100644
index 00000000000..3fd40c4d4f2
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DisruptorShutdownTest {
+
+ @Test
+ public void
testBatchEventProcessorDrainsPublishedEventsOnShutdownInterrupt() throws
Exception {
+ final RingBuffer<TestEvent> ringBuffer =
RingBuffer.createMultiProducer(TestEvent::new, 32);
+ ringBuffer.publishEvent((event, sequence, value) -> event.value = value,
1);
+
+ final TestSequenceBarrier barrier = new TestSequenceBarrier(0L);
+ final AtomicInteger handledEventCount = new AtomicInteger();
+ final BatchEventProcessor<TestEvent> processor =
+ new BatchEventProcessor<>(
+ ringBuffer,
+ barrier,
+ (event, sequence, endOfBatch) ->
handledEventCount.incrementAndGet());
+
+ final Thread processorThread = new Thread(processor,
"pipe-batch-event-processor-test");
+ processorThread.start();
+
+ Assert.assertTrue(barrier.awaitWaitForCall());
+ processor.halt();
+ barrier.interruptWait();
+
+ processorThread.join(TimeUnit.SECONDS.toMillis(5));
+
+ Assert.assertFalse(processorThread.isAlive());
+ Assert.assertEquals(1, handledEventCount.get());
+ Assert.assertEquals(0L, processor.getSequence().get());
+ }
+
+ @Test
+ public void
testBatchEventProcessorDrainsEventsPublishedAfterCurrentBatchWhenHalting()
+ throws Exception {
+ final RingBuffer<TestEvent> ringBuffer =
RingBuffer.createMultiProducer(TestEvent::new, 32);
+ ringBuffer.publishEvent((event, sequence, value) -> event.value = value,
1);
+ ringBuffer.publishEvent((event, sequence, value) -> event.value = value,
2);
+
+ final SnapshotSequenceBarrier barrier = new SnapshotSequenceBarrier(0L,
1L);
+ final AtomicInteger handledEventCount = new AtomicInteger();
+ final AtomicReference<BatchEventProcessor<TestEvent>> processorReference =
+ new AtomicReference<>();
+ final BatchEventProcessor<TestEvent> processor =
+ new BatchEventProcessor<>(
+ ringBuffer,
+ barrier,
+ (event, sequence, endOfBatch) -> {
+ handledEventCount.incrementAndGet();
+ if (event.value == 1) {
+ processorReference.get().halt();
+ }
+ });
+ processorReference.set(processor);
+
+ final Thread processorThread =
+ new Thread(processor, "pipe-batch-event-processor-snapshot-test");
+ processorThread.start();
+ processorThread.join(TimeUnit.SECONDS.toMillis(5));
+
+ Assert.assertFalse(processorThread.isAlive());
+ Assert.assertEquals(2, handledEventCount.get());
+ Assert.assertEquals(1L, processor.getSequence().get());
+ }
+
+ @Test
+ public void testDisruptorShutdownInterruptsWaitingProcessor() throws
Exception {
+ final AtomicReference<Thread> processorThreadReference = new
AtomicReference<>();
+ final ThreadFactory threadFactory =
+ runnable -> {
+ final Thread thread = new Thread(runnable,
"pipe-disruptor-shutdown-test");
+ processorThreadReference.set(thread);
+ return thread;
+ };
+
+ final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32,
threadFactory);
+ disruptor.handleEventsWith((event, sequence, endOfBatch) -> {});
+ disruptor.start();
+
+ final Thread processorThread = processorThreadReference.get();
+ Assert.assertNotNull(processorThread);
+
+ TimeUnit.MILLISECONDS.sleep(50);
+ disruptor.shutdown();
+
+ Assert.assertFalse(processorThread.isAlive());
+ }
+
+ private static class TestEvent {
+ private int value;
+ }
+
+ private static class TestSequenceBarrier extends SequenceBarrier {
+
+ private final long cursor;
+ private final CountDownLatch waitForCalled = new CountDownLatch(1);
+ private final CountDownLatch interruptWait = new CountDownLatch(1);
+
+ private TestSequenceBarrier(final long cursor) {
+ super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
+ this.cursor = cursor;
+ }
+
+ @Override
+ public long waitFor(final long sequence) throws InterruptedException {
+ waitForCalled.countDown();
+ interruptWait.await();
+ throw new InterruptedException();
+ }
+
+ @Override
+ public long getCursor() {
+ return cursor;
+ }
+
+ @Override
+ public long getHighestPublishedSequence(final long lowerBound, final long
availableSequence) {
+ return availableSequence;
+ }
+
+ private boolean awaitWaitForCall() throws InterruptedException {
+ return waitForCalled.await(5, TimeUnit.SECONDS);
+ }
+
+ private void interruptWait() {
+ interruptWait.countDown();
+ }
+ }
+
+ private static class SnapshotSequenceBarrier extends SequenceBarrier {
+
+ private final long waitForResult;
+ private final long cursor;
+
+ private SnapshotSequenceBarrier(final long waitForResult, final long
cursor) {
+ super(new MultiProducerSequencer(32, new Sequence[0]), new Sequence[0]);
+ this.waitForResult = waitForResult;
+ this.cursor = cursor;
+ }
+
+ @Override
+ public long waitFor(final long sequence) {
+ return waitForResult;
+ }
+
+ @Override
+ public long getCursor() {
+ return cursor;
+ }
+
+ @Override
+ public long getHighestPublishedSequence(final long lowerBound, final long
availableSequence) {
+ return availableSequence;
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 294a1231e35..cbe907c71b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,6 @@
<commons-pool2.version>2.11.1</commons-pool2.version>
<commons.collections4.version>4.4</commons.collections4.version>
<ctest.skip.tests>false</ctest.skip.tests>
- <disruptor.version>3.4.4</disruptor.version>
<drill.freemarker.maven.plugin.version>1.21.1</drill.freemarker.maven.plugin.version>
<dropwizard.metrics.version>4.2.19</dropwizard.metrics.version>
<eclipse-collections.version>11.1.0</eclipse-collections.version>
@@ -454,11 +453,6 @@
<artifactId>h2-mvstore</artifactId>
<version>${h2.version}</version>
</dependency>
- <dependency>
- <groupId>com.lmax</groupId>
- <artifactId>disruptor</artifactId>
- <version>${disruptor.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>