This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch interrupt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ca9a8a7613569a421abef8ca7e5d0948f5fc0fba Author: Caideyipi <[email protected]> AuthorDate: Thu May 14 21:13:37 2026 +0800 Fix interrupt --- .../realtime/assigner/DisruptorQueue.java | 19 ++++++-- .../realtime/assigner/PipeDataRegionAssigner.java | 41 ++++++++++++++-- .../realtime/disruptor/BatchEventProcessor.java | 11 +++-- .../realtime/disruptor/MultiProducerSequencer.java | 21 ++++++++ .../dataregion/realtime/disruptor/RingBuffer.java | 24 ++++++++- .../realtime/disruptor/DisruptorShutdownTest.java | 57 ++++++++++++++++++++++ 6 files changed, 160 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java index 2019eba8560..d3fc92a677a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java @@ -87,16 +87,29 @@ public class DisruptorQueue { } public void publish(final PipeRealtimeEvent event) { + publishOrDrop(event); + } + + public boolean publishOrDrop(final PipeRealtimeEvent event) { final EnrichedEvent innerEvent = event.getEvent(); if (innerEvent instanceof PipeHeartbeatEvent) { ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer); } - ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event); - mayPrintExceedingLog(); + final boolean published = + ringBuffer.publishEvent( + (container, sequence, o) -> container.setEvent(event), event, this::isClosed); + if (published) { + mayPrintExceedingLog(); + } + return published; } - public void shutdown() { + public void closeInput() { isClosed = true; + } + + public void shutdown() { + closeInput(); // use shutdown instead of halt to ensure all published events have been handled disruptor.shutdown(); allocatedMemoryBlock.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 9c356e8654b..c9e65bc59c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -67,6 +67,7 @@ public class PipeDataRegionAssigner implements Closeable { private Boolean isTableModel; private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter(); + private int inFlightPublishCount = 0; public int getDataRegionId() { return dataRegionId; @@ -101,14 +102,28 @@ public class PipeDataRegionAssigner implements Closeable { ((PipeHeartbeatEvent) innerEvent).onPublished(); } - // use synchronized here for completely preventing reference count leaks under extreme thread - // scheduling when closing synchronized (this) { - if (!disruptor.isClosed()) { - disruptor.publish(event); - } else { + if (disruptor.isClosed()) { onAssignedHook(event); + return; } + inFlightPublishCount++; + } + + boolean isPublished = false; + try { + isPublished = disruptor.publishOrDrop(event); + } finally { + synchronized (this) { + inFlightPublishCount--; + if (inFlightPublishCount == 0) { + notifyAll(); + } + } + } + + if (!isPublished) { + onAssignedHook(event); } } @@ -254,9 +269,25 @@ public class PipeDataRegionAssigner implements Closeable { public synchronized void close() { PipeAssignerMetrics.getInstance().deregister(dataRegionId); + boolean interrupted = false; + disruptor.closeInput(); + while (inFlightPublishCount > 0) { + try { + wait(); + } catch (final InterruptedException e) { + interrupted = true; + LOGGER.warn( + "Interrupted while waiting for in-flight publishes to finish when closing assigner on data region {}.", + dataRegionId); + } + } + final long startTime = System.currentTimeMillis(); disruptor.shutdown(); matcher.clear(); + if (interrupted) { + Thread.currentThread().interrupt(); + } LOGGER.info( "Pipe: Assigner on data region {} shutdown internal disruptor within {} ms", dataRegionId, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java index d0432821cf7..6a3b2cc62fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java @@ -80,11 +80,14 @@ public final class BatchEventProcessor<T> implements Runnable { nextSequence = processAvailableEvents(nextSequence, availableSequence); } catch (final InterruptedException ex) { - if (running) { - Thread.currentThread().interrupt(); - LOGGER.info("Processor interrupted"); + if (!running) { + break; } - break; + // A transient interrupt should not permanently stop the consumer thread. Otherwise the + // gating sequence will stop advancing and producers may block forever on a full ring + // buffer, making the later close path appear stuck. + Thread.interrupted(); + LOGGER.warn("Processor interrupted unexpectedly, continue running"); } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, ringBuffer.get(nextSequence)); sequence.set(nextSequence); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java index 9aa7716a7a5..457593ca979 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java @@ -23,6 +23,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; +import java.util.function.BooleanSupplier; /** * Multi-producer sequencer for coordinating concurrent publishers @@ -110,14 +111,31 @@ public final class MultiProducerSequencer { * @return highest claimed sequence number */ public long next(int n) { + return next(n, () -> false); + } + + /** + * Claim next n sequences for publishing, or abort if the caller is closing. + * + * @param n number of sequences to claim + * @param abortCondition returns {@code true} if the claim should be abandoned + * @return highest claimed sequence number, or {@link Sequence#INITIAL_VALUE} if aborted + */ + public long next(final int n, final BooleanSupplier abortCondition) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } + final BooleanSupplier effectiveAbortCondition = + abortCondition != null ? abortCondition : () -> false; long current; long next; do { + if (effectiveAbortCondition.getAsBoolean()) { + return Sequence.INITIAL_VALUE; + } + current = cursor.get(); next = current + n; @@ -128,6 +146,9 @@ public final class MultiProducerSequencer { long gatingSequence = Sequence.getMinimumSequence(gatingSequences.get(), current); if (wrapPoint > gatingSequence) { + if (effectiveAbortCondition.getAsBoolean()) { + return Sequence.INITIAL_VALUE; + } LockSupport.parkNanos(1); continue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java index 2af784b603d..0b9e98400fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; +import java.util.function.BooleanSupplier; + /** * Left-hand side padding for cache line alignment * @@ -205,8 +207,28 @@ public final class RingBuffer<E> extends RingBufferFields<E> { * @param <A> argument type */ public <A> void publishEvent(EventTranslator<E, A> translator, A arg0) { - final long sequence = sequencer.next(1); + publishEvent(translator, arg0, () -> false); + } + + /** + * Publish event using a translator function, or abort if the caller is closing. + * + * @param translator function to populate the event + * @param arg0 argument passed to translator + * @param abortCondition returns {@code true} if the publish should be abandoned + * @param <A> argument type + * @return {@code true} if the event is published, {@code false} if the publish is aborted + */ + public <A> boolean publishEvent( + final EventTranslator<E, A> translator, + final A arg0, + final BooleanSupplier abortCondition) { + final long sequence = sequencer.next(1, abortCondition); + if (sequence == Sequence.INITIAL_VALUE) { + return false; + } translateAndPublish(translator, sequence, arg0); + return true; } /** diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java index 3fd40c4d4f2..b2bebde2010 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -113,6 +114,62 @@ public class DisruptorShutdownTest { Assert.assertFalse(processorThread.isAlive()); } + @Test + public void testUnexpectedInterruptDoesNotStopProcessor() throws Exception { + final AtomicReference<Thread> processorThreadReference = new AtomicReference<>(); + final ThreadFactory threadFactory = + runnable -> { + final Thread thread = + new Thread(runnable, "pipe-disruptor-unexpected-interrupt-test"); + processorThreadReference.set(thread); + return thread; + }; + + final CountDownLatch handled = new CountDownLatch(1); + final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32, threadFactory); + final RingBuffer<TestEvent> ringBuffer = + disruptor.handleEventsWith((event, sequence, endOfBatch) -> handled.countDown()).start(); + + final Thread processorThread = processorThreadReference.get(); + Assert.assertNotNull(processorThread); + + TimeUnit.MILLISECONDS.sleep(50); + processorThread.interrupt(); + + ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1); + Assert.assertTrue(handled.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(processorThread.isAlive()); + + disruptor.shutdown(); + Assert.assertFalse(processorThread.isAlive()); + } + + @Test + public void testPublishEventCanAbortWhenClosingWhileBufferIsFull() throws Exception { + final RingBuffer<TestEvent> ringBuffer = RingBuffer.createMultiProducer(TestEvent::new, 1); + final Sequence gatingSequence = new Sequence(); + ringBuffer.addGatingSequences(gatingSequence); + ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1); + + final AtomicBoolean isClosed = new AtomicBoolean(false); + final AtomicBoolean published = new AtomicBoolean(true); + final Thread publisherThread = + new Thread( + () -> + published.set( + ringBuffer.publishEvent( + (event, sequence, value) -> event.value = value, 2, isClosed::get)), + "pipe-disruptor-publish-abort-test"); + + publisherThread.start(); + TimeUnit.MILLISECONDS.sleep(50); + isClosed.set(true); + publisherThread.join(TimeUnit.SECONDS.toMillis(5)); + + Assert.assertFalse(publisherThread.isAlive()); + Assert.assertFalse(published.get()); + } + private static class TestEvent { private int value; }
