This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c39061ae458 Subscription: Drain TsFile batches before termination
(#17926)
c39061ae458 is described below
commit c39061ae4586f1b3c25f96e61d09f62a3326a2eb
Author: VGalaxies <[email protected]>
AuthorDate: Tue Jun 16 09:50:01 2026 +0800
Subscription: Drain TsFile batches before termination (#17926)
* Subscription: drain tsfile batches before termination
* Subscription: address termination drain races
* Subscription: add tsfile termination drain IT
* Subscription: fix CI quality warnings
* Subscription: address review comments
---
.../dual/tablemodel/IoTDBSubscriptionTopicIT.java | 86 +++++++++
.../apache/iotdb/db/i18n/DataNodeMiscMessages.java | 5 +
.../apache/iotdb/db/i18n/DataNodeMiscMessages.java | 4 +
.../broker/SubscriptionPrefetchingQueue.java | 199 +++++++++++----------
.../event/batch/SubscriptionPipeEventBatch.java | 23 ++-
.../event/batch/SubscriptionPipeEventBatches.java | 29 +++
6 files changed, 246 insertions(+), 100 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
index f3c0dd29f46..c71a7b2319b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java
@@ -56,6 +56,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
@@ -93,6 +94,8 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
.getConfig()
.getCommonConfig()
.setPipeMetaSyncerSyncIntervalMinutes(1)
+ .setSubscriptionPrefetchTsFileBatchMaxDelayInMs(600_000)
+ .setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(64 * 1024 * 1024)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
}
@@ -382,6 +385,89 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
}
+ @Test
+ public void testTsFileSnapshotDrainsPendingBatchBeforeTermination() throws
Exception {
+ TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
+ TableModelUtils.createDataBaseAndTable(receiverEnv, "test1", "test1");
+
+ // Insert historical data and create a closed TsFile before snapshot
subscription starts.
+ TableModelUtils.insertData("test1", "test1", 0, 10, senderEnv);
+
+ final String topicName = "topic_drain_tsfile_batch_before_termination";
+ final String host = senderEnv.getIP();
+ final int port = Integer.parseInt(senderEnv.getPort());
+ try (final ISubscriptionTableSession session =
+ new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
+ final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
+ config.put(TopicConstant.DATABASE_KEY, "test1");
+ config.put(TopicConstant.TABLE_KEY, "test1");
+ // Force TsFile parsing so the snapshot data is buffered in
SubscriptionPipeTsFileEventBatch.
+ config.put(TopicConstant.START_TIME_KEY, 1);
+ session.createTopic(topicName, config);
+ }
+ assertTopicCount(1);
+
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final AtomicReference<Throwable> consumerFailure = new AtomicReference<>();
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final ISubscriptionTablePullConsumer consumer =
+ new SubscriptionTablePullConsumerBuilder()
+ .host(host)
+ .port(port)
+ .consumerId("c_drain")
+ .consumerGroupId("cg_drain")
+ .autoCommit(false)
+ .build();
+ final ITableSession session =
receiverEnv.getTableSessionConnection()) {
+ consumer.open();
+ consumer.subscribe(topicName);
+ while (!isClosed.get()) {
+ final List<SubscriptionMessage> messages =
+
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+ insertData(messages, session);
+ consumer.commitSync(messages);
+ }
+ } catch (final Throwable e) {
+ consumerFailure.set(e);
+ } finally {
+ LOGGER.info("draining consumer exiting...");
+ }
+ },
+ String.format("%s - draining consumer",
testName.getDisplayName()));
+ thread.start();
+
+ try {
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ };
+ AWAIT.untilAsserted(
+ () -> {
+ Assert.assertNull(consumerFailure.get());
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final TShowSubscriptionResp showSubscriptionResp =
+ client.showSubscription(new TShowSubscriptionReq());
+ Assert.assertEquals(
+ RpcUtils.SUCCESS_STATUS.getCode(),
showSubscriptionResp.status.getCode());
+ Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
+ Assert.assertEquals(0,
showSubscriptionResp.subscriptionInfoList.size());
+ }
+
+ TableModelUtils.assertData("test1", "test1", 1, 10, receiverEnv,
handleFailure);
+ });
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
+ }
+
/////////////////////////////// utility ///////////////////////////////
private void assertTopicCount(final int count) throws Exception {
diff --git
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
index 8eb26f05e0a..2f0a53db5c4 100644
---
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
@@ -779,6 +779,11 @@ public final class DataNodeMiscMessages {
"Exception {} occurred when {} execute receiver subtask";
public static final String EXCEPTION_CONSTRUCT_TABLET_ITERATOR =
"Exception {} occurred when {} construct ToTabletIterator";
+ public static final String
EXCEPTION_EMIT_EVENTS_BEFORE_COMMIT_TERMINATE_EVENT =
+ "Subscription: SubscriptionPrefetchingQueue {} failed to emit remaining
events before "
+ + "committing PipeTerminateEvent {}";
+ public static final String COMMIT_TERMINATE_EVENT =
+ "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent
{}";
//
---------------------------------------------------------------------------
// consensus – BaseStateMachine
diff --git
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
index 69e4370b664..c5625447212 100644
---
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
@@ -778,6 +778,10 @@ public final class DataNodeMiscMessages {
"异常 {} 在 {} 执行接收子任务时发生";
public static final String EXCEPTION_CONSTRUCT_TABLET_ITERATOR =
"异常 {} 在 {} 构造 ToTabletIterator 时发生";
+ public static final String
EXCEPTION_EMIT_EVENTS_BEFORE_COMMIT_TERMINATE_EVENT =
+ "订阅:SubscriptionPrefetchingQueue {} 在提交 PipeTerminateEvent {} 前封存剩余事件失败";
+ public static final String COMMIT_TERMINATE_EVENT =
+ "订阅:SubscriptionPrefetchingQueue {} 提交 PipeTerminateEvent {}";
//
---------------------------------------------------------------------------
// consensus – BaseStateMachine
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 086fde0fbcc..8fc7227fd15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -117,6 +117,7 @@ public abstract class SubscriptionPrefetchingQueue {
private volatile TsFileInsertionEvent currentTsFileInsertionEvent;
private volatile RetryableEvent<TabletInsertionEvent>
currentTabletInsertionEvent;
private volatile SubscriptionTsFileToTabletIterator currentToTabletIterator;
+ private PipeTerminateEvent currentTerminateEvent;
public SubscriptionPrefetchingQueue(
final String brokerId,
@@ -175,6 +176,10 @@ public abstract class SubscriptionPrefetchingQueue {
.clearReferenceCount(this.getClass().getName());
currentTabletInsertionEvent = null;
}
+ if (Objects.nonNull(currentTerminateEvent)) {
+ currentTerminateEvent.clearReferenceCount(this.getClass().getName());
+ currentTerminateEvent = null;
+ }
}
///////////////////////////////// lock /////////////////////////////////
@@ -236,44 +241,8 @@ public abstract class SubscriptionPrefetchingQueue {
onEvent();
}
- final long size = prefetchingQueue.size();
- long count = 0;
-
- SubscriptionEvent event;
try {
- while (count++ < size // limit control
- && Objects.nonNull(
- event =
- prefetchingQueue.poll(
-
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
- TimeUnit.MILLISECONDS))) {
- if (event.isCommitted()) {
- LOGGER.warn(
- "Subscription: SubscriptionPrefetchingQueue {} poll committed
event {} from prefetching queue (broken invariant), remove it",
- this,
- event);
- // no need to update inFlightEvents
- continue;
- }
-
- if (!event.pollable()) {
- LOGGER.warn(
- "Subscription: SubscriptionPrefetchingQueue {} poll non-pollable
event {} from prefetching queue (broken invariant), nack and remove it",
- this,
- event);
- event.nack(); // now pollable
- // no need to update inFlightEvents and prefetchingQueue
- continue;
- }
-
- // This operation should be performed before updating inFlightEvents
to prevent multiple
- // consumers from consuming the same event.
- event.recordLastPolledTimestamp(); // now non-pollable
-
- inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()),
event);
- event.recordLastPolledConsumerId(consumerId);
- return event;
- }
+ return pollPrefetchedEvent(consumerId);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(
@@ -307,40 +276,8 @@ public abstract class SubscriptionPrefetchingQueue {
onEvent();
}
- final long size = prefetchingQueue.size();
- long count = 0;
-
- while (count++ < size // limit control
- && Objects.nonNull(
- event =
- prefetchingQueue.poll(
-
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
- TimeUnit.MILLISECONDS))) {
- if (event.isCommitted()) {
- LOGGER.warn(
- "Subscription: SubscriptionPrefetchingQueue {} poll committed
event {} from prefetching queue (broken invariant), remove it",
- this,
- event);
- // no need to update inFlightEvents
- continue;
- }
-
- if (!event.pollable()) {
- LOGGER.warn(
- "Subscription: SubscriptionPrefetchingQueue {} poll
non-pollable event {} from prefetching queue (broken invariant), nack and
remove it",
- this,
- event);
- event.nack(); // now pollable
- // no need to update inFlightEvents and prefetchingQueue
- continue;
- }
-
- // This operation should be performed before updating inFlightEvents
to prevent multiple
- // consumers from consuming the same event.
- event.recordLastPolledTimestamp(); // now non-pollable
-
- inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()),
event);
- event.recordLastPolledConsumerId(consumerId);
+ event = pollPrefetchedEvent(consumerId);
+ if (Objects.nonNull(event)) {
return event;
}
} catch (final InterruptedException e) {
@@ -356,6 +293,49 @@ public abstract class SubscriptionPrefetchingQueue {
return null;
}
+ private synchronized SubscriptionEvent pollPrefetchedEvent(final String
consumerId)
+ throws InterruptedException {
+ final long size = prefetchingQueue.size();
+ long count = 0;
+
+ SubscriptionEvent event;
+ while (count++ < size // limit control
+ && Objects.nonNull(
+ event =
+ prefetchingQueue.poll(
+
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
+ TimeUnit.MILLISECONDS))) {
+ if (event.isCommitted()) {
+ LOGGER.warn(
+ "Subscription: SubscriptionPrefetchingQueue {} poll committed
event {} from prefetching queue (broken invariant), remove it",
+ this,
+ event);
+ // no need to update inFlightEvents
+ continue;
+ }
+
+ if (!event.pollable()) {
+ LOGGER.warn(
+ "Subscription: SubscriptionPrefetchingQueue {} poll non-pollable
event {} from prefetching queue (broken invariant), nack and remove it",
+ this,
+ event);
+ event.nack(); // now pollable
+ // no need to update inFlightEvents and prefetchingQueue
+ continue;
+ }
+
+ // This operation should be performed before updating inFlightEvents to
prevent multiple
+ // consumers from consuming the same event.
+ event.recordLastPolledTimestamp(); // now non-pollable
+
+ inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()),
event);
+ event.recordLastPolledConsumerId(consumerId);
+ return event;
+ }
+
+ return null;
+ }
+
/////////////////////////////// prefetch ///////////////////////////////
public boolean executePrefetch() {
@@ -366,7 +346,12 @@ public abstract class SubscriptionPrefetchingQueue {
}
reportStateIfNeeded();
// TODO: more refined behavior (prefetch/serialize/...) control
- if (states.shouldPrefetch()) {
+ if (hasCurrentTerminateEvent()) {
+ tryCommitCurrentTerminateEventIfPresent();
+ remapInFlightEventsSnapshot(
+ committedCleaner, pollableNacker, responsePrefetcher,
responseSerializer);
+ return true;
+ } else if (states.shouldPrefetch()) {
tryPrefetch();
remapInFlightEventsSnapshot(
committedCleaner, pollableNacker, responsePrefetcher,
responseSerializer);
@@ -467,6 +452,10 @@ public abstract class SubscriptionPrefetchingQueue {
* {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
*/
private synchronized void tryPrefetch() {
+ if (Objects.nonNull(currentTerminateEvent) &&
!tryCommitCurrentTerminateEvent()) {
+ return;
+ }
+
while (!inputPendingQueue.isEmpty() ||
Objects.nonNull(currentTabletInsertionEvent)) {
if (Objects.nonNull(currentTabletInsertionEvent)) {
final RetryableState state =
onRetryableTabletInsertionEvent(currentTabletInsertionEvent);
@@ -497,16 +486,10 @@ public abstract class SubscriptionPrefetchingQueue {
}
if (event instanceof PipeTerminateEvent) {
- final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
- // add mark completed hook
- terminateEvent.addOnCommittedHook(this::markCompleted);
- // commit directly
- ((PipeTerminateEvent) event)
-
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
- LOGGER.info(
- "Subscription: SubscriptionPrefetchingQueue {} commit
PipeTerminateEvent {}",
- this,
- terminateEvent);
+ currentTerminateEvent = (PipeTerminateEvent) event;
+ if (!tryCommitCurrentTerminateEvent()) {
+ return;
+ }
continue;
}
@@ -549,6 +532,11 @@ public abstract class SubscriptionPrefetchingQueue {
}
private synchronized void tryPrefetchV2() {
+ if (Objects.nonNull(currentTerminateEvent)) {
+ tryCommitCurrentTerminateEvent();
+ return;
+ }
+
if (!prefetchingQueue.isEmpty()) {
return;
}
@@ -613,16 +601,8 @@ public abstract class SubscriptionPrefetchingQueue {
}
if (event instanceof PipeTerminateEvent) {
- final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
- // add mark completed hook
- terminateEvent.addOnCommittedHook(this::markCompleted);
- // commit directly
- ((PipeTerminateEvent) event)
-
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
- LOGGER.info(
- "Subscription: SubscriptionPrefetchingQueue {} commit
PipeTerminateEvent {}",
- this,
- terminateEvent);
+ currentTerminateEvent = (PipeTerminateEvent) event;
+ tryCommitCurrentTerminateEvent();
return;
}
@@ -731,6 +711,41 @@ public abstract class SubscriptionPrefetchingQueue {
return batches.onEvent(this::prefetchEvent);
}
+ private synchronized boolean hasCurrentTerminateEvent() {
+ return Objects.nonNull(currentTerminateEvent);
+ }
+
+ private synchronized void tryCommitCurrentTerminateEventIfPresent() {
+ if (Objects.nonNull(currentTerminateEvent)) {
+ tryCommitCurrentTerminateEvent();
+ }
+ }
+
+ private synchronized boolean tryCommitCurrentTerminateEvent() {
+ try {
+ batches.emitAll(this::prefetchEvent);
+ } catch (final Exception e) {
+ LOGGER.warn(
+
DataNodeMiscMessages.EXCEPTION_EMIT_EVENTS_BEFORE_COMMIT_TERMINATE_EVENT,
+ this,
+ currentTerminateEvent,
+ e);
+ return false;
+ }
+
+ if (!prefetchingQueue.isEmpty() || !inFlightEvents.isEmpty()) {
+ return false;
+ }
+
+ // Add mark completed hook only when all subscription events have been
consumed.
+ currentTerminateEvent.addOnCommittedHook(this::markCompleted);
+ currentTerminateEvent.decreaseReferenceCount(
+ SubscriptionPrefetchingQueue.class.getName(), true);
+ LOGGER.info(DataNodeMiscMessages.COMMIT_TERMINATE_EVENT, this,
currentTerminateEvent);
+ currentTerminateEvent = null;
+ return true;
+ }
+
/////////////////////////////// commit ///////////////////////////////
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
index d25573add6b..0707d979d6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java
@@ -71,14 +71,7 @@ public abstract class SubscriptionPipeEventBatch {
protected synchronized boolean onEvent(final Consumer<SubscriptionEvent>
consumer)
throws Exception {
if (shouldEmit() && !enrichedEvents.isEmpty()) {
- if (Objects.isNull(events)) {
- events = generateSubscriptionEvents();
- }
- if (Objects.nonNull(events)) {
- events.forEach(consumer);
- return true;
- }
- return false;
+ return emit(consumer);
}
return false;
}
@@ -101,6 +94,20 @@ public abstract class SubscriptionPipeEventBatch {
return onEvent(consumer);
}
+ protected synchronized boolean emit(final Consumer<SubscriptionEvent>
consumer) throws Exception {
+ if (enrichedEvents.isEmpty()) {
+ return false;
+ }
+ if (Objects.isNull(events)) {
+ events = generateSubscriptionEvents();
+ }
+ if (Objects.nonNull(events)) {
+ events.forEach(consumer);
+ return true;
+ }
+ return false;
+ }
+
/////////////////////////////// utility ///////////////////////////////
protected abstract void onTabletInsertionEvent(final TabletInsertionEvent
event);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
index 467f788f797..0cb81ca43e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
@@ -142,6 +142,35 @@ public class SubscriptionPipeEventBatches {
return hasNew.get();
}
+ public boolean emitAll(final Consumer<SubscriptionEvent> consumer) throws
Exception {
+ final AtomicBoolean hasNew = new AtomicBoolean(false);
+ Exception exception = null;
+ for (final int regionId : ImmutableList.copyOf(regionIdToBatch.keySet())) {
+ try {
+ segmentLock.lock(regionId);
+ final SubscriptionPipeEventBatch batch = regionIdToBatch.get(regionId);
+ if (Objects.isNull(batch)) {
+ continue;
+ }
+ if (batch.emit(consumer)) {
+ hasNew.set(true);
+ regionIdToBatch.remove(regionId);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ DataNodeMiscMessages.EXCEPTION_SEALING_EVENTS,
regionIdToBatch.get(regionId), e);
+ exception = e;
+ } finally {
+ segmentLock.unlock(regionId);
+ }
+ }
+
+ if (Objects.nonNull(exception)) {
+ throw exception;
+ }
+ return hasNew.get();
+ }
+
public void cleanUp() {
regionIdToBatch.values().forEach(batch -> batch.cleanUp(true));
regionIdToBatch.clear();