This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch interrupt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8fe849985e6f0403d1b38945eb39319d892bcee7 Author: Caideyipi <[email protected]> AuthorDate: Thu May 14 21:13:37 2026 +0800 gix --- .../realtime/disruptor/BatchEventProcessor.java | 11 +++++--- .../realtime/disruptor/DisruptorShutdownTest.java | 30 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java index d0432821cf7..6a3b2cc62fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java @@ -80,11 +80,14 @@ public final class BatchEventProcessor<T> implements Runnable { nextSequence = processAvailableEvents(nextSequence, availableSequence); } catch (final InterruptedException ex) { - if (running) { - Thread.currentThread().interrupt(); - LOGGER.info("Processor interrupted"); + if (!running) { + break; } - break; + // A transient interrupt should not permanently stop the consumer thread. Otherwise the + // gating sequence will stop advancing and producers may block forever on a full ring + // buffer, making the later close path appear stuck. + Thread.interrupted(); + LOGGER.warn("Processor interrupted unexpectedly, continue running"); } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, ringBuffer.get(nextSequence)); sequence.set(nextSequence); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java index 3fd40c4d4f2..9bdcf42de7d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java @@ -113,6 +113,36 @@ public class DisruptorShutdownTest { Assert.assertFalse(processorThread.isAlive()); } + @Test + public void testUnexpectedInterruptDoesNotStopProcessor() throws Exception { + final AtomicReference<Thread> processorThreadReference = new AtomicReference<>(); + final ThreadFactory threadFactory = + runnable -> { + final Thread thread = + new Thread(runnable, "pipe-disruptor-unexpected-interrupt-test"); + processorThreadReference.set(thread); + return thread; + }; + + final CountDownLatch handled = new CountDownLatch(1); + final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32, threadFactory); + final RingBuffer<TestEvent> ringBuffer = + disruptor.handleEventsWith((event, sequence, endOfBatch) -> handled.countDown()).start(); + + final Thread processorThread = processorThreadReference.get(); + Assert.assertNotNull(processorThread); + + TimeUnit.MILLISECONDS.sleep(50); + processorThread.interrupt(); + + ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1); + Assert.assertTrue(handled.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(processorThread.isAlive()); + + disruptor.shutdown(); + Assert.assertFalse(processorThread.isAlive()); + } + private static class TestEvent { private int value; }
