This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new c8bff0002bd Subscription: retain tsfile events in tsfile batch to
avoid premature commit (#15598) (#15601)
c8bff0002bd is described below
commit c8bff0002bd8345b1cc2e1f8a1a2712fa2f3c1dd
Author: VGalaxies <[email protected]>
AuthorDate: Thu May 29 18:21:10 2025 +0800
Subscription: retain tsfile events in tsfile batch to avoid premature
commit (#15598) (#15601)
Consider the historical data export snapshot scenario:
1. The events delivered upstream are, in order, the tsfile event and the
termination event.
2. The tsfile event is parsed into multiple tablet events, and then the
reference count of the tsfile event is set to 0 (should report as false).
3. Assuming that for some reason the tablet events were not sent to the
peer in time, the reference count of the transfer termination event is set to 0
(should report as true).
4. At this point, because the tablet events were not enriched with a commit
id (see Subscription: fully managed tsfile parsing process for tsfile format
topicĀ #15524), the termination event successfully marks the corresponding DR
complete, which in turn leads to data loss.
---
.../event/batch/SubscriptionPipeTsFileEventBatch.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index b4782507e47..7b1a6caf610 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -57,12 +57,22 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
@Override
public synchronized void ack() {
batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
+ enrichedEvents.stream()
+ // only decrease reference count for tsfile event, since we already
decrease reference count
+ // for tablet event in batch
+ .filter(event -> event instanceof PipeTsFileInsertionEvent)
+ .forEach(event ->
event.decreaseReferenceCount(this.getClass().getName(), true));
}
@Override
public synchronized void cleanUp(final boolean force) {
// close batch, it includes clearing the reference count of events
batch.close();
+
+ // clear the reference count of events
+ for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+ enrichedEvent.clearReferenceCount(this.getClass().getName());
+ }
enrichedEvents.clear();
}
@@ -102,7 +112,6 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
} finally {
try {
event.close();
- ((PipeTsFileInsertionEvent)
event).decreaseReferenceCount(this.getClass().getName(), false);
} catch (final Exception ignored) {
// no exceptions will be thrown
}