This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch hook-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 230034876f910199df8509ce9568cc01b0e98947
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 25 19:34:22 2026 +0800

    new
---
 .../common/tablet/PipeRawTabletInsertionEvent.java | 25 ++++++++++------------
 .../task/progress/interval/PipeCommitInterval.java |  3 ++-
 2 files changed, 13 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 1377c415319..b72815adf9a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -85,14 +85,18 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
     this.allocatedMemoryBlock =
         
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
 
-    addOnCommittedHook(
-        () -> {
-          if (shouldReportOnCommit) {
-            eliminateProgressIndex();
-          }
-        });
+    triggerAddHook();
   }
 
+  private void triggerAddHook() {
+    if (shouldReportOnCommit && needToReport && sourceEvent instanceof 
PipeTsFileInsertionEvent) {
+      final PipeTsFileInsertionEvent event = ((PipeTsFileInsertionEvent) 
sourceEvent);
+      addOnCommittedHook(event::eliminateProgressIndex);
+    }
+  }
+
+
+
   public PipeRawTabletInsertionEvent(
       final Tablet tablet,
       final boolean isAligned,
@@ -181,14 +185,6 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
     return true;
   }
 
-  protected void eliminateProgressIndex() {
-    if (needToReport) {
-      if (sourceEvent instanceof PipeTsFileInsertionEvent) {
-        ((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
-      }
-    }
-  }
-
   @Override
   public void bindProgressIndex(final ProgressIndex overridingProgressIndex) {
     // Normally not all events need to report progress, but if the 
overridingProgressIndex
@@ -254,6 +250,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
 
   public void markAsNeedToReport() {
     this.needToReport = true;
+    triggerAddHook();
   }
 
   // This getter is reserved for user-defined plugins
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
index 885df4727da..456acd646dc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.interval.Interval;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -43,7 +44,7 @@ public class PipeCommitInterval extends 
Interval<PipeCommitInterval> {
     this.pipeTaskMeta = pipeTaskMeta;
     this.currentIndex =
         Objects.nonNull(currentIndex) ? currentIndex : 
MinimumProgressIndex.INSTANCE;
-    this.onCommittedHooks = onCommittedHooks;
+    this.onCommittedHooks = new ArrayList<>(onCommittedHooks);
   }
 
   @Override

Reply via email to