This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b1deb6c7cf9 Pipe: Optimized the commit queue to avoid OOM problem 
(#16145)
b1deb6c7cf9 is described below

commit b1deb6c7cf9e6baf2d4bed0663fb5570bf9d9789
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 12 19:14:20 2025 +0800

    Pipe: Optimized the commit queue to avoid OOM problem (#16145)
    
    * first
    
    * trial
    
    * mayfix
    
    * may_final
    
    * may_last
    
    * may_fix
    
    * partial
    
    * completion
    
    * enhance-test
    
    * fix
    
    * bug-fix-n
---
 .../common/deletion/PipeDeleteDataNodeEvent.java   |   5 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  11 +-
 .../event/common/terminate/PipeTerminateEvent.java |   5 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  13 +-
 .../broker/SubscriptionPrefetchingQueue.java       |  12 +-
 .../agent/task/progress/PipeEventCommitter.java    |  70 +---------
 .../task/progress/interval/PipeCommitInterval.java |  79 +++++++++++
 .../task/progress/interval/PipeCommitQueue.java    |  60 +++++++++
 .../pipe/datastructure/interval/Interval.java      |  48 +++++++
 .../datastructure/interval/IntervalManager.java    |  74 +++++++++++
 .../iotdb/commons/pipe/event/EnrichedEvent.java    |  40 +++---
 .../pipe/datastructure/PipeCommitQueueTest.java    | 148 +++++++++++++++++++++
 12 files changed, 449 insertions(+), 116 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
index 692144edfb0..1225d59cf68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
@@ -80,6 +80,7 @@ public class PipeDeleteDataNodeEvent extends EnrichedEvent 
implements Serializab
     this.deleteDataNode = deleteDataNode;
     Optional.ofNullable(deleteDataNode)
         .ifPresent(node -> this.progressIndex = 
deleteDataNode.getProgressIndex());
+    addOnCommittedHook(this::decreaseDeletionReference);
   }
 
   public AbstractDeleteDataNode getDeleteDataNode() {
@@ -104,9 +105,7 @@ public class PipeDeleteDataNodeEvent extends EnrichedEvent 
implements Serializab
     return true;
   }
 
-  @Override
-  public void onCommitted() {
-    super.onCommitted();
+  public void decreaseDeletionReference() {
     if (deletionResource != null) {
       deletionResource.decreaseReference();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 349bfc2f6b0..322b5ea2435 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -108,6 +108,13 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
     // Allocate empty memory block, will be resized later.
     this.allocatedMemoryBlock =
         
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
+    addOnCommittedHook(
+        () -> {
+          if (shouldReportOnCommit) {
+            eliminateProgressIndex();
+          }
+        });
   }
 
   public PipeRawTabletInsertionEvent(
@@ -259,10 +266,8 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
     return true;
   }
 
-  @Override
-  protected void reportProgress() {
+  protected void eliminateProgressIndex() {
     if (needToReport) {
-      super.reportProgress();
       if (sourceEvent instanceof PipeTsFileInsertionEvent) {
         ((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index bf7b5a6f527..6e00fc8513f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -79,6 +79,8 @@ public class PipeTerminateEvent extends EnrichedEvent {
         Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
     this.shouldMark = shouldMark;
+
+    addOnCommittedHook(this::markCompleted);
   }
 
   @Override
@@ -127,8 +129,7 @@ public class PipeTerminateEvent extends EnrichedEvent {
     return true;
   }
 
-  @Override
-  public void reportProgress() {
+  public void markCompleted() {
     // To avoid deadlock
     if (shouldMark) {
       terminateExecutor.submit(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 5db40e82786..454c9cf0063 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -207,6 +207,13 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
     isClosed.set(resource.isClosed());
 
     this.eventParser = new AtomicReference<>(null);
+
+    addOnCommittedHook(
+        () -> {
+          if (shouldReportOnCommit) {
+            eliminateProgressIndex();
+          }
+        });
   }
 
   /**
@@ -371,12 +378,6 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
     return resource.getMaxProgressIndex();
   }
 
-  @Override
-  protected void reportProgress() {
-    super.reportProgress();
-    this.eliminateProgressIndex();
-  }
-
   public void eliminateProgressIndex() {
     if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
       PipeTsFileEpochProgressIndexKeeper.getInstance()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 3dd9e0d54fb..903f59e4225 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -499,11 +499,7 @@ public abstract class SubscriptionPrefetchingQueue {
       if (event instanceof PipeTerminateEvent) {
         final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
         // add mark completed hook
-        terminateEvent.addOnCommittedHook(
-            () -> {
-              markCompleted();
-              return null;
-            });
+        terminateEvent.addOnCommittedHook(this::markCompleted);
         // commit directly
         ((PipeTerminateEvent) event)
             
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
@@ -619,11 +615,7 @@ public abstract class SubscriptionPrefetchingQueue {
     if (event instanceof PipeTerminateEvent) {
       final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
       // add mark completed hook
-      terminateEvent.addOnCommittedHook(
-          () -> {
-            markCompleted();
-            return null;
-          });
+      terminateEvent.addOnCommittedHook(this::markCompleted);
       // commit directly
       ((PipeTerminateEvent) event)
           
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
index 61f23907819..42429fca253 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
@@ -19,33 +19,20 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.progress;
 
+import 
org.apache.iotdb.commons.pipe.agent.task.progress.interval.PipeCommitQueue;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Comparator;
-import java.util.Objects;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 /** Used to queue {@link Event}s for one pipe of one region to commit in 
order. */
 public class PipeEventCommitter {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeEventCommitter.class);
-
   private final CommitterKey committerKey;
 
   private final AtomicLong commitIdGenerator = new AtomicLong(0);
-  private final AtomicLong lastCommitId = new AtomicLong(0);
 
-  private final PriorityBlockingQueue<EnrichedEvent> commitQueue =
-      new PriorityBlockingQueue<>(
-          11,
-          Comparator.comparing(
-              event ->
-                  Objects.requireNonNull(event, "committable event cannot be 
null").getCommitId()));
+  private final PipeCommitQueue commitQueue = new PipeCommitQueue();
 
   PipeEventCommitter(final CommitterKey committerKey) {
     // make it package-private
@@ -64,55 +51,6 @@ public class PipeEventCommitter {
       }
     }
     commitQueue.offer(event);
-
-    final int commitQueueSizeBeforeCommit = commitQueue.size();
-    if (LOGGER.isDebugEnabled()) {
-      if (commitQueueSizeBeforeCommit != 0 && commitQueueSizeBeforeCommit % 
100 == 0) {
-        LOGGER.info(
-            "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last 
commit id {}, commit queue size {}",
-            committerKey,
-            event.getCommitId(),
-            lastCommitId.get(),
-            commitQueueSizeBeforeCommit);
-      } else {
-        LOGGER.debug(
-            "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last 
commit id {}, commit queue size {}",
-            committerKey,
-            event.getCommitId(),
-            lastCommitId.get(),
-            commitQueueSizeBeforeCommit);
-      }
-    }
-
-    while (!commitQueue.isEmpty()) {
-      final EnrichedEvent e = commitQueue.peek();
-
-      if (e.getCommitId() <= lastCommitId.get()) {
-        LOGGER.info(
-            "commit id is not monotonically increasing, current commit id: {}, 
last commit id: {}, event: {}, may be because the tsFile has been compacted",
-            e.getCommitId(),
-            lastCommitId.get(),
-            e.coreReportMessage());
-        commitQueue.poll();
-        continue;
-      }
-
-      if (e.getCommitId() != lastCommitId.get() + 1) {
-        break;
-      }
-
-      e.onCommitted();
-      lastCommitId.incrementAndGet();
-      commitQueue.poll();
-
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "COMMIT QUEUE POLL: committer key {}, last commit id {}, commit 
queue size after commit {}",
-            committerKey,
-            lastCommitId.get(),
-            commitQueue.size());
-      }
-    }
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
@@ -133,8 +71,4 @@ public class PipeEventCommitter {
   public long commitQueueSize() {
     return commitQueue.size();
   }
-
-  public long getCurrentCommitId() {
-    return commitIdGenerator.get();
-  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
new file mode 100644
index 00000000000..7ee8008c0e0
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.task.progress.interval;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.interval.Interval;
+
+import java.util.List;
+import java.util.Objects;
+
+public class PipeCommitInterval extends Interval<PipeCommitInterval> {
+
+  private ProgressIndex currentIndex;
+  private List<Runnable> onCommittedHooks;
+  private final PipeTaskMeta pipeTaskMeta;
+
+  public PipeCommitInterval(
+      final long s,
+      final long e,
+      final ProgressIndex currentIndex,
+      final List<Runnable> onCommittedHooks,
+      final PipeTaskMeta pipeTaskMeta) {
+    super(s, e);
+    this.pipeTaskMeta = pipeTaskMeta;
+    this.currentIndex = currentIndex;
+    this.onCommittedHooks = onCommittedHooks;
+  }
+
+  @Override
+  public void onMerged(final PipeCommitInterval another) {
+    currentIndex = 
currentIndex.updateToMinimumEqualOrIsAfterProgressIndex(another.currentIndex);
+
+    // Keep in order
+    if (this.start <= another.start) {
+      onCommittedHooks.addAll(another.onCommittedHooks);
+    } else {
+      // Note that if merged, another interval is not supposed to be reused
+      // thus we can arbitrarily alter its hooks
+      another.onCommittedHooks.addAll(this.onCommittedHooks);
+      this.onCommittedHooks = another.onCommittedHooks;
+    }
+  }
+
+  @Override
+  public void onRemoved() {
+    if (Objects.nonNull(pipeTaskMeta)) {
+      pipeTaskMeta.updateProgressIndex(currentIndex);
+    }
+    onCommittedHooks.forEach(Runnable::run);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeCommitInterval{"
+        + "progressIndex='"
+        + currentIndex
+        + "', range="
+        + super.toString()
+        + "}";
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitQueue.java
new file mode 100644
index 00000000000..7b48aa39c9a
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitQueue.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.task.progress.interval;
+
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.datastructure.interval.IntervalManager;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+
+public class PipeCommitQueue {
+  private final IntervalManager<PipeCommitInterval> intervalManager = new 
IntervalManager<>();
+  private long lastCommitted = 0;
+
+  public void offer(final EnrichedEvent event) {
+    final PipeCommitInterval interval =
+        new PipeCommitInterval(
+            event.getCommitId(),
+            event.getCommitId(),
+            event.isShouldReportOnCommit()
+                ? event.getProgressIndex()
+                : MinimumProgressIndex.INSTANCE,
+            event.getOnCommittedHooks(),
+            event.getPipeTaskMeta());
+    intervalManager.addInterval(interval);
+    if (interval.start == lastCommitted + 1) {
+      intervalManager.remove(interval);
+      lastCommitted = interval.end;
+    }
+  }
+
+  public int size() {
+    return intervalManager.size();
+  }
+
+  @Override
+  public String toString() {
+    return "PipeCommitQueue{"
+        + "lastCommitted='"
+        + lastCommitted
+        + "', IntervalManager="
+        + intervalManager
+        + "}";
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
new file mode 100644
index 00000000000..67ecaa66d6e
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.interval;
+
+public class Interval<T extends Interval<T>> implements 
Comparable<Interval<?>> {
+  public long start;
+  public long end;
+
+  public Interval(final long s, final long e) {
+    start = s;
+    end = e;
+  }
+
+  public void onMerged(final T another) {
+    // Do nothing by default
+  }
+
+  public void onRemoved() {
+    // Do nothing by default
+  }
+
+  @Override
+  public int compareTo(final Interval other) {
+    return Long.compare(this.start, other.start);
+  }
+
+  @Override
+  public String toString() {
+    return "[" + start + ", " + end + "]";
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
new file mode 100644
index 00000000000..179a379b7c3
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.interval;
+
+import java.util.TreeSet;
+
+public class IntervalManager<T extends Interval<T>> {
+  private final TreeSet<T> intervals = new TreeSet<>();
+
+  // insert into new interval and merge
+  public void addInterval(final T newInterval) {
+    // Left closest
+    final T left = intervals.floor(newInterval);
+
+    // Right closest
+    final T right = intervals.ceiling(newInterval);
+
+    // Merge left ([0,1] + [2,3] → [0,3])
+    if (left != null && left.end >= newInterval.start - 1) {
+      newInterval.start = Math.min(left.start, newInterval.start);
+      newInterval.end = Math.max(left.end, newInterval.end);
+      newInterval.onMerged(left);
+      intervals.remove(left);
+    }
+
+    // Merge right ([2,3] + [3,4] → [2,4])
+    if (right != null && newInterval.end >= right.start - 1) {
+      newInterval.start = Math.min(newInterval.start, right.start);
+      newInterval.end = Math.max(newInterval.end, right.end);
+      newInterval.onMerged(right);
+      intervals.remove(right);
+    }
+
+    intervals.add(newInterval);
+  }
+
+  public T peek() {
+    return intervals.first();
+  }
+
+  public boolean remove(final T interval) {
+    if (intervals.remove(interval)) {
+      interval.onRemoved();
+      return true;
+    }
+    return false;
+  }
+
+  public int size() {
+    return intervals.size();
+  }
+
+  @Override
+  public String toString() {
+    return "IntervalManager{" + "intervals=" + intervals + '}';
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index c7e0345cc59..1f1686f1eb5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -20,13 +20,13 @@
 package org.apache.iotdb.commons.pipe.event;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.slf4j.Logger;
@@ -38,7 +38,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
 
 /**
  * {@link EnrichedEvent} is an {@link Event} that can be enriched with 
additional runtime
@@ -78,7 +77,7 @@ public abstract class EnrichedEvent implements Event {
   protected boolean isTimeParsed;
 
   protected volatile boolean shouldReportOnCommit = true;
-  protected List<Supplier<Void>> onCommittedHooks = new ArrayList<>();
+  protected List<Runnable> onCommittedHooks = new ArrayList<>();
   protected String userName;
   protected boolean skipIfNoPrivileges;
 
@@ -110,14 +109,6 @@ public abstract class EnrichedEvent implements Event {
             && (tablePattern == null
                 || 
!tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern());
     isTimeParsed = Long.MIN_VALUE == startTime && Long.MAX_VALUE == endTime;
-
-    addOnCommittedHook(
-        () -> {
-          if (shouldReportOnCommit) {
-            reportProgress();
-          }
-          return null;
-        });
   }
 
   protected void trackResource() {
@@ -283,14 +274,6 @@ public abstract class EnrichedEvent implements Event {
    */
   public abstract boolean internallyDecreaseResourceReferenceCount(final 
String holderMessage);
 
-  protected void reportProgress() {
-    if (pipeTaskMeta != null) {
-      final ProgressIndex progressIndex = getProgressIndex();
-      pipeTaskMeta.updateProgressIndex(
-          progressIndex == null ? MinimumProgressIndex.INSTANCE : 
progressIndex);
-    }
-  }
-
   /**
    * Externally skip the report of the processing {@link ProgressIndex} of 
this {@link
    * EnrichedEvent} when committed. Report by generated events are still 
allowed.
@@ -400,6 +383,19 @@ public abstract class EnrichedEvent implements Event {
       final long startTime,
       final long endTime);
 
+  @TestOnly
+  public void setShouldReportOnCommit(final boolean shouldReportOnCommit) {
+    this.shouldReportOnCommit = shouldReportOnCommit;
+  }
+
+  public boolean isShouldReportOnCommit() {
+    return shouldReportOnCommit;
+  }
+
+  public List<Runnable> getOnCommittedHooks() {
+    return onCommittedHooks;
+  }
+
   public PipeTaskMeta getPipeTaskMeta() {
     return pipeTaskMeta;
   }
@@ -468,11 +464,7 @@ public abstract class EnrichedEvent implements Event {
     this.replicateIndexForIoTV2 = replicateIndexForIoTV2;
   }
 
-  public void onCommitted() {
-    onCommittedHooks.forEach(Supplier::get);
-  }
-
-  public void addOnCommittedHook(final Supplier<Void> hook) {
+  public void addOnCommittedHook(final Runnable hook) {
     onCommittedHooks.add(hook);
   }
 
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java
new file mode 100644
index 00000000000..651042f9605
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import 
org.apache.iotdb.commons.pipe.agent.task.progress.interval.PipeCommitQueue;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class PipeCommitQueueTest {
+
+  private final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+  private final Set<Integer> commitHookTestSet = new HashSet<>();
+
+  @Test
+  public void testCommitQueue() {
+    final PipeCommitQueue pipeCommitQueue = new PipeCommitQueue();
+    pipeCommitQueue.offer(new TestEnrichedEvent(1, new IoTProgressIndex(0, 
1L)));
+    pipeCommitQueue.offer(new TestEnrichedEvent(3, new IoTProgressIndex(0, 
3L)));
+    Assert.assertEquals(1, pipeCommitQueue.size());
+    Assert.assertEquals(new IoTProgressIndex(0, 1L), 
pipeTaskMeta.getProgressIndex());
+    TestEnrichedEvent nextEvent = new TestEnrichedEvent(5, new 
IoTProgressIndex(0, 5L));
+    nextEvent.setShouldReportOnCommit(false);
+    pipeCommitQueue.offer(nextEvent);
+    pipeCommitQueue.offer(new TestEnrichedEvent(4, new IoTProgressIndex(0, 
4L)));
+    Assert.assertEquals(1, pipeCommitQueue.size());
+    nextEvent = new TestEnrichedEvent(2, new IoTProgressIndex(0, 2L));
+    nextEvent.addOnCommittedHook(() -> commitHookTestSet.add(1));
+    pipeCommitQueue.offer(nextEvent);
+    Assert.assertEquals(0, pipeCommitQueue.size());
+    Assert.assertEquals(new IoTProgressIndex(0, 4L), 
pipeTaskMeta.getProgressIndex());
+    Assert.assertEquals(1, commitHookTestSet.size());
+  }
+
+  private class TestEnrichedEvent extends EnrichedEvent {
+    private ProgressIndex progressIndex;
+
+    private TestEnrichedEvent(final long commitId, final ProgressIndex 
progressIndex) {
+      this(
+          null,
+          0,
+          PipeCommitQueueTest.this.pipeTaskMeta,
+          null,
+          null,
+          null,
+          false,
+          Long.MIN_VALUE,
+          Long.MAX_VALUE);
+      this.commitId = commitId;
+      this.progressIndex = progressIndex;
+    }
+
+    @Override
+    public ProgressIndex getProgressIndex() {
+      return progressIndex;
+    }
+
+    /////////////////////////////// Useless logic 
///////////////////////////////
+
+    protected TestEnrichedEvent(
+        final String pipeName,
+        final long creationTime,
+        final PipeTaskMeta pipeTaskMeta,
+        final TreePattern treePattern,
+        final TablePattern tablePattern,
+        final String userName,
+        final boolean skipIfNoPrivileges,
+        final long startTime,
+        final long endTime) {
+      super(
+          pipeName,
+          creationTime,
+          pipeTaskMeta,
+          treePattern,
+          tablePattern,
+          userName,
+          skipIfNoPrivileges,
+          startTime,
+          endTime);
+    }
+
+    @Override
+    public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
+      return false;
+    }
+
+    @Override
+    public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
+      return false;
+    }
+
+    @Override
+    public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+        final String pipeName,
+        final long creationTime,
+        final PipeTaskMeta pipeTaskMeta,
+        final TreePattern treePattern,
+        final TablePattern tablePattern,
+        final String userName,
+        final boolean skipIfNoPrivileges,
+        final long startTime,
+        final long endTime) {
+      return null;
+    }
+
+    @Override
+    public boolean isGeneratedByPipe() {
+      return false;
+    }
+
+    @Override
+    public boolean mayEventTimeOverlappedWithTimeRange() {
+      return false;
+    }
+
+    @Override
+    public boolean mayEventPathsOverlappedWithPattern() {
+      return false;
+    }
+  }
+}

Reply via email to