This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c9ec0fda2d1 Subscription: distinguish between reference count of ack 
and clean in tsfile batch to avoid cleaning before ack (#15229)
c9ec0fda2d1 is described below

commit c9ec0fda2d183192a428088c69aff3cdcbbe1acb
Author: VGalaxies <[email protected]>
AuthorDate: Mon Mar 31 10:51:06 2025 +0800

    Subscription: distinguish between reference count of ack and clean in 
tsfile batch to avoid cleaning before ack (#15229)
---
 .../event/batch/SubscriptionPipeTsFileEventBatch.java    |  5 +++--
 .../event/pipe/SubscriptionPipeTsFileBatchEvents.java    | 16 ++++++++++------
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index a1d3e27f6c7..1409de02bf3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -96,13 +96,14 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
 
     final List<SubscriptionEvent> events = new ArrayList<>();
     final List<Pair<String, File>> dbTsFilePairs = batch.sealTsFiles();
-    final AtomicInteger referenceCount = new 
AtomicInteger(dbTsFilePairs.size());
+    final AtomicInteger ackReferenceCount = new 
AtomicInteger(dbTsFilePairs.size());
+    final AtomicInteger cleanReferenceCount = new 
AtomicInteger(dbTsFilePairs.size());
     for (final Pair<String, File> tsFile : dbTsFilePairs) {
       final SubscriptionCommitContext commitContext =
           prefetchingQueue.generateSubscriptionCommitContext();
       events.add(
           new SubscriptionEvent(
-              new SubscriptionPipeTsFileBatchEvents(this, referenceCount),
+              new SubscriptionPipeTsFileBatchEvents(this, ackReferenceCount, 
cleanReferenceCount),
               tsFile.right,
               commitContext));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index 8cf31408ec0..651763ab652 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -28,26 +28,30 @@ import static 
com.google.common.base.MoreObjects.toStringHelper;
 public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents {
 
   private final SubscriptionPipeTsFileEventBatch batch;
-  private final AtomicInteger referenceCount; // shared between the same batch
+  private final AtomicInteger ackReferenceCount; // shared between the same 
batch
+  private final AtomicInteger cleanReferenceCount; // shared between the same 
batch
   private final int count; // snapshot the initial reference count, used for 
event count calculation
 
   public SubscriptionPipeTsFileBatchEvents(
-      final SubscriptionPipeTsFileEventBatch batch, final AtomicInteger 
referenceCount) {
+      final SubscriptionPipeTsFileEventBatch batch,
+      final AtomicInteger ackReferenceCount,
+      final AtomicInteger cleanReferenceCount) {
     this.batch = batch;
-    this.referenceCount = referenceCount;
-    this.count = Math.max(1, referenceCount.get());
+    this.ackReferenceCount = ackReferenceCount;
+    this.cleanReferenceCount = cleanReferenceCount;
+    this.count = Math.max(1, ackReferenceCount.get());
   }
 
   @Override
   public void ack() {
-    if (referenceCount.decrementAndGet() == 0) {
+    if (ackReferenceCount.decrementAndGet() == 0) {
       batch.ack();
     }
   }
 
   @Override
   public void cleanUp(final boolean force) {
-    if (referenceCount.decrementAndGet() == 0) {
+    if (cleanReferenceCount.decrementAndGet() == 0) {
       batch.cleanUp(force);
     }
   }

Reply via email to