This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch backport/17673-dev-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 00463d689d9952576782941de6e30369aecc1461 Author: Caideyipi <[email protected]> AuthorDate: Tue Jun 2 14:29:00 2026 +0800 Pipe: Fixed the bug that drop pipe may stuck when disruptor is interrupted or ring buffer full (#17673) * Fix interrupt * spotless (cherry picked from commit b614d6d16b5ab2ed50216028e4467b165004f6fb) --- .../realtime/assigner/DisruptorQueue.java | 19 ++++++-- .../realtime/assigner/PipeDataRegionAssigner.java | 41 ++++++++++++++-- .../realtime/disruptor/BatchEventProcessor.java | 11 +++-- .../realtime/disruptor/MultiProducerSequencer.java | 21 ++++++++ .../dataregion/realtime/disruptor/RingBuffer.java | 22 ++++++++- .../realtime/disruptor/DisruptorShutdownTest.java | 56 ++++++++++++++++++++++ 6 files changed, 157 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java index 2019eba8560..d3fc92a677a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java @@ -87,16 +87,29 @@ public class DisruptorQueue { } public void publish(final PipeRealtimeEvent event) { + publishOrDrop(event); + } + + public boolean publishOrDrop(final PipeRealtimeEvent event) { final EnrichedEvent innerEvent = event.getEvent(); if (innerEvent instanceof PipeHeartbeatEvent) { ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer); } - ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event); - mayPrintExceedingLog(); + final boolean published = + ringBuffer.publishEvent( + (container, sequence, o) -> container.setEvent(event), event, this::isClosed); + if (published) { + mayPrintExceedingLog(); + } + return published; } - public void shutdown() { + public void closeInput() { isClosed = true; + } + + public void shutdown() { + closeInput(); // use shutdown instead of halt to ensure all published events have been handled disruptor.shutdown(); allocatedMemoryBlock.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 2375726e427..01dcbb89265 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -60,6 +60,7 @@ public class PipeDataRegionAssigner implements Closeable { private volatile int listenToInsertNodeSourceCount = 0; private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter(); + private int inFlightPublishCount = 0; public String getDataRegionId() { return dataRegionId; @@ -87,14 +88,28 @@ public class PipeDataRegionAssigner implements Closeable { ((PipeHeartbeatEvent) innerEvent).onPublished(); } - // use synchronized here for completely preventing reference count leaks under extreme thread - // scheduling when closing synchronized (this) { - if (!disruptor.isClosed()) { - disruptor.publish(event); - } else { + if (disruptor.isClosed()) { onAssignedHook(event); + return; } + inFlightPublishCount++; + } + + boolean isPublished = false; + try { + isPublished = disruptor.publishOrDrop(event); + } finally { + synchronized (this) { + inFlightPublishCount--; + if (inFlightPublishCount == 0) { + notifyAll(); + } + } + } + + if (!isPublished) { + onAssignedHook(event); } } @@ -241,9 +256,25 @@ public class PipeDataRegionAssigner implements Closeable { public synchronized void close() { PipeAssignerMetrics.getInstance().deregister(dataRegionId); + boolean interrupted = false; + disruptor.closeInput(); + while (inFlightPublishCount > 0) { + try { + wait(); + } catch (final InterruptedException e) { + interrupted = true; + LOGGER.warn( + "Interrupted while waiting for in-flight publishes to finish when closing assigner on data region {}.", + dataRegionId); + } + } + final long startTime = System.currentTimeMillis(); disruptor.shutdown(); matcher.clear(); + if (interrupted) { + Thread.currentThread().interrupt(); + } LOGGER.info( "Pipe: Assigner on data region {} shutdown internal disruptor within {} ms", dataRegionId, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java index d0432821cf7..6a3b2cc62fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java @@ -80,11 +80,14 @@ public final class BatchEventProcessor<T> implements Runnable { nextSequence = processAvailableEvents(nextSequence, availableSequence); } catch (final InterruptedException ex) { - if (running) { - Thread.currentThread().interrupt(); - LOGGER.info("Processor interrupted"); + if (!running) { + break; } - break; + // A transient interrupt should not permanently stop the consumer thread. Otherwise the + // gating sequence will stop advancing and producers may block forever on a full ring + // buffer, making the later close path appear stuck. + Thread.interrupted(); + LOGGER.warn("Processor interrupted unexpectedly, continue running"); } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, ringBuffer.get(nextSequence)); sequence.set(nextSequence); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java index d40ed968398..79d205d670c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.locks.LockSupport; +import java.util.function.BooleanSupplier; /** * Multi-producer sequencer for coordinating concurrent publishers @@ -108,14 +109,31 @@ public final class MultiProducerSequencer { * @return highest claimed sequence number */ public long next(int n) { + return next(n, () -> false); + } + + /** + * Claim next n sequences for publishing, or abort if the caller is closing. + * + * @param n number of sequences to claim + * @param abortCondition returns {@code true} if the claim should be abandoned + * @return highest claimed sequence number, or {@link Sequence#INITIAL_VALUE} if aborted + */ + public long next(final int n, final BooleanSupplier abortCondition) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } + final BooleanSupplier effectiveAbortCondition = + abortCondition != null ? abortCondition : () -> false; long current; long next; do { + if (effectiveAbortCondition.getAsBoolean()) { + return Sequence.INITIAL_VALUE; + } + current = cursor.get(); next = current + n; @@ -126,6 +144,9 @@ public final class MultiProducerSequencer { long gatingSequence = Sequence.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { + if (effectiveAbortCondition.getAsBoolean()) { + return Sequence.INITIAL_VALUE; + } LockSupport.parkNanos(1); continue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java index 2af784b603d..25f7fe7d1b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; +import java.util.function.BooleanSupplier; + /** * Left-hand side padding for cache line alignment * @@ -205,8 +207,26 @@ public final class RingBuffer<E> extends RingBufferFields<E> { * @param <A> argument type */ public <A> void publishEvent(EventTranslator<E, A> translator, A arg0) { - final long sequence = sequencer.next(1); + publishEvent(translator, arg0, () -> false); + } + + /** + * Publish event using a translator function, or abort if the caller is closing. + * + * @param translator function to populate the event + * @param arg0 argument passed to translator + * @param abortCondition returns {@code true} if the publish should be abandoned + * @param <A> argument type + * @return {@code true} if the event is published, {@code false} if the publish is aborted + */ + public <A> boolean publishEvent( + final EventTranslator<E, A> translator, final A arg0, final BooleanSupplier abortCondition) { + final long sequence = sequencer.next(1, abortCondition); + if (sequence == Sequence.INITIAL_VALUE) { + return false; + } translateAndPublish(translator, sequence, arg0); + return true; } /** diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java index 3fd40c4d4f2..ef57ea625ae 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -113,6 +114,61 @@ public class DisruptorShutdownTest { Assert.assertFalse(processorThread.isAlive()); } + @Test + public void testUnexpectedInterruptDoesNotStopProcessor() throws Exception { + final AtomicReference<Thread> processorThreadReference = new AtomicReference<>(); + final ThreadFactory threadFactory = + runnable -> { + final Thread thread = new Thread(runnable, "pipe-disruptor-unexpected-interrupt-test"); + processorThreadReference.set(thread); + return thread; + }; + + final CountDownLatch handled = new CountDownLatch(1); + final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32, threadFactory); + final RingBuffer<TestEvent> ringBuffer = + disruptor.handleEventsWith((event, sequence, endOfBatch) -> handled.countDown()).start(); + + final Thread processorThread = processorThreadReference.get(); + Assert.assertNotNull(processorThread); + + TimeUnit.MILLISECONDS.sleep(50); + processorThread.interrupt(); + + ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1); + Assert.assertTrue(handled.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(processorThread.isAlive()); + + disruptor.shutdown(); + Assert.assertFalse(processorThread.isAlive()); + } + + @Test + public void testPublishEventCanAbortWhenClosingWhileBufferIsFull() throws Exception { + final RingBuffer<TestEvent> ringBuffer = RingBuffer.createMultiProducer(TestEvent::new, 1); + final Sequence gatingSequence = new Sequence(); + ringBuffer.addGatingSequences(gatingSequence); + ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1); + + final AtomicBoolean isClosed = new AtomicBoolean(false); + final AtomicBoolean published = new AtomicBoolean(true); + final Thread publisherThread = + new Thread( + () -> + published.set( + ringBuffer.publishEvent( + (event, sequence, value) -> event.value = value, 2, isClosed::get)), + "pipe-disruptor-publish-abort-test"); + + publisherThread.start(); + TimeUnit.MILLISECONDS.sleep(50); + isClosed.set(true); + publisherThread.join(TimeUnit.SECONDS.toMillis(5)); + + Assert.assertFalse(publisherThread.isAlive()); + Assert.assertFalse(published.get()); + } + private static class TestEvent { private int value; }
