This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new f2029f86ce1 Subscription: distinguish between reference count of ack 
and clean in tsfile batch to avoid cleaning before ack (#15229) (#15233)
f2029f86ce1 is described below

commit f2029f86ce173678b9d57d55e24773a80d8b07a6
Author: VGalaxies <[email protected]>
AuthorDate: Mon Mar 31 12:17:30 2025 +0800

    Subscription: distinguish between reference count of ack and clean in 
tsfile batch to avoid cleaning before ack (#15229) (#15233)
---
 .../event/batch/SubscriptionPipeTsFileEventBatch.java    |  7 +++++--
 .../event/pipe/SubscriptionPipeTsFileBatchEvents.java    | 16 ++++++++++------
 2 files changed, 15 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 71671d55d95..47be439912b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -95,13 +95,16 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
 
     final List<SubscriptionEvent> events = new ArrayList<>();
     final List<File> tsFiles = batch.sealTsFiles();
-    final AtomicInteger referenceCount = new AtomicInteger(tsFiles.size());
+    final AtomicInteger ackReferenceCount = new AtomicInteger(tsFiles.size());
+    final AtomicInteger cleanReferenceCount = new 
AtomicInteger(tsFiles.size());
     for (final File tsFile : tsFiles) {
       final SubscriptionCommitContext commitContext =
           prefetchingQueue.generateSubscriptionCommitContext();
       events.add(
           new SubscriptionEvent(
-              new SubscriptionPipeTsFileBatchEvents(this, referenceCount), 
tsFile, commitContext));
+              new SubscriptionPipeTsFileBatchEvents(this, ackReferenceCount, 
cleanReferenceCount),
+              tsFile,
+              commitContext));
     }
     return events;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index 8cf31408ec0..651763ab652 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -28,26 +28,30 @@ import static 
com.google.common.base.MoreObjects.toStringHelper;
 public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents {
 
   private final SubscriptionPipeTsFileEventBatch batch;
-  private final AtomicInteger referenceCount; // shared between the same batch
+  private final AtomicInteger ackReferenceCount; // shared between the same 
batch
+  private final AtomicInteger cleanReferenceCount; // shared between the same 
batch
   private final int count; // snapshot the initial reference count, used for 
event count calculation
 
   public SubscriptionPipeTsFileBatchEvents(
-      final SubscriptionPipeTsFileEventBatch batch, final AtomicInteger 
referenceCount) {
+      final SubscriptionPipeTsFileEventBatch batch,
+      final AtomicInteger ackReferenceCount,
+      final AtomicInteger cleanReferenceCount) {
     this.batch = batch;
-    this.referenceCount = referenceCount;
-    this.count = Math.max(1, referenceCount.get());
+    this.ackReferenceCount = ackReferenceCount;
+    this.cleanReferenceCount = cleanReferenceCount;
+    this.count = Math.max(1, ackReferenceCount.get());
   }
 
   @Override
   public void ack() {
-    if (referenceCount.decrementAndGet() == 0) {
+    if (ackReferenceCount.decrementAndGet() == 0) {
       batch.ack();
     }
   }
 
   @Override
   public void cleanUp(final boolean force) {
-    if (referenceCount.decrementAndGet() == 0) {
+    if (cleanReferenceCount.decrementAndGet() == 0) {
       batch.cleanUp(force);
     }
   }

Reply via email to