This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch hook
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/hook by this push:
new c69bce8f3be new
c69bce8f3be is described below
commit c69bce8f3bef42b4b31f6992bd9b4816c708c589
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 25 19:34:22 2026 +0800
new
---
.../common/tablet/PipeRawTabletInsertionEvent.java | 25 ++++++++++------------
.../task/progress/interval/PipeCommitInterval.java | 3 ++-
2 files changed, 13 insertions(+), 15 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index c4f900b7d5f..a857c967db0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -116,14 +116,18 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
this.allocatedMemoryBlock =
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
- addOnCommittedHook(
- () -> {
- if (shouldReportOnCommit) {
- eliminateProgressIndex();
- }
- });
+ triggerAddHook();
}
+ private void triggerAddHook() {
+ if (shouldReportOnCommit && needToReport && sourceEvent instanceof
PipeTsFileInsertionEvent) {
+ final PipeTsFileInsertionEvent event = ((PipeTsFileInsertionEvent)
sourceEvent);
+ addOnCommittedHook(event::eliminateProgressIndex);
+ }
+ }
+
+
+
public PipeRawTabletInsertionEvent(
final Boolean isTableModelEvent,
final String databaseName,
@@ -302,14 +306,6 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
return true;
}
- protected void eliminateProgressIndex() {
- if (needToReport) {
- if (sourceEvent instanceof PipeTsFileInsertionEvent) {
- ((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
- }
- }
- }
-
@Override
public void bindProgressIndex(final ProgressIndex overridingProgressIndex) {
// Normally not all events need to report progress, but if the
overridingProgressIndex
@@ -388,6 +384,7 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
public void markAsNeedToReport() {
this.needToReport = true;
+ triggerAddHook();
}
// This getter is reserved for user-defined plugins
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
index 885df4727da..456acd646dc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.interval.Interval;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -43,7 +44,7 @@ public class PipeCommitInterval extends
Interval<PipeCommitInterval> {
this.pipeTaskMeta = pipeTaskMeta;
this.currentIndex =
Objects.nonNull(currentIndex) ? currentIndex :
MinimumProgressIndex.INSTANCE;
- this.onCommittedHooks = onCommittedHooks;
+ this.onCommittedHooks = new ArrayList<>(onCommittedHooks);
}
@Override