This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f2029f86ce1 Subscription: distinguish between reference count of ack
and clean in tsfile batch to avoid cleaning before ack (#15229) (#15233)
f2029f86ce1 is described below
commit f2029f86ce173678b9d57d55e24773a80d8b07a6
Author: VGalaxies <[email protected]>
AuthorDate: Mon Mar 31 12:17:30 2025 +0800
Subscription: distinguish between reference count of ack and clean in
tsfile batch to avoid cleaning before ack (#15229) (#15233)
---
.../event/batch/SubscriptionPipeTsFileEventBatch.java | 7 +++++--
.../event/pipe/SubscriptionPipeTsFileBatchEvents.java | 16 ++++++++++------
2 files changed, 15 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 71671d55d95..47be439912b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -95,13 +95,16 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
final List<SubscriptionEvent> events = new ArrayList<>();
final List<File> tsFiles = batch.sealTsFiles();
- final AtomicInteger referenceCount = new AtomicInteger(tsFiles.size());
+ final AtomicInteger ackReferenceCount = new AtomicInteger(tsFiles.size());
+ final AtomicInteger cleanReferenceCount = new
AtomicInteger(tsFiles.size());
for (final File tsFile : tsFiles) {
final SubscriptionCommitContext commitContext =
prefetchingQueue.generateSubscriptionCommitContext();
events.add(
new SubscriptionEvent(
- new SubscriptionPipeTsFileBatchEvents(this, referenceCount),
tsFile, commitContext));
+ new SubscriptionPipeTsFileBatchEvents(this, ackReferenceCount,
cleanReferenceCount),
+ tsFile,
+ commitContext));
}
return events;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index 8cf31408ec0..651763ab652 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -28,26 +28,30 @@ import static
com.google.common.base.MoreObjects.toStringHelper;
public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents {
private final SubscriptionPipeTsFileEventBatch batch;
- private final AtomicInteger referenceCount; // shared between the same batch
+ private final AtomicInteger ackReferenceCount; // shared between the same
batch
+ private final AtomicInteger cleanReferenceCount; // shared between the same
batch
private final int count; // snapshot the initial reference count, used for
event count calculation
public SubscriptionPipeTsFileBatchEvents(
- final SubscriptionPipeTsFileEventBatch batch, final AtomicInteger
referenceCount) {
+ final SubscriptionPipeTsFileEventBatch batch,
+ final AtomicInteger ackReferenceCount,
+ final AtomicInteger cleanReferenceCount) {
this.batch = batch;
- this.referenceCount = referenceCount;
- this.count = Math.max(1, referenceCount.get());
+ this.ackReferenceCount = ackReferenceCount;
+ this.cleanReferenceCount = cleanReferenceCount;
+ this.count = Math.max(1, ackReferenceCount.get());
}
@Override
public void ack() {
- if (referenceCount.decrementAndGet() == 0) {
+ if (ackReferenceCount.decrementAndGet() == 0) {
batch.ack();
}
}
@Override
public void cleanUp(final boolean force) {
- if (referenceCount.decrementAndGet() == 0) {
+ if (cleanReferenceCount.decrementAndGet() == 0) {
batch.cleanUp(force);
}
}