This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch ts-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ts-fix by this push:
     new 8e4bb49de10 fix
8e4bb49de10 is described below

commit 8e4bb49de10bb04e6a1429579107b87508bc6bcf
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 8 19:02:19 2026 +0800

    fix
---
 .../source/dataregion/realtime/PipeRealtimeDataRegionSource.java   | 7 +++++++
 .../pipe/agent/task/connection/UnboundedBlockingPendingQueue.java  | 4 ++++
 2 files changed, 11 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 1e47a48e500..718529243d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -427,6 +427,13 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   }
 
   protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
+    // Remove any heartbeat events in front of this event to avoid OOM
+    // Since the batch and retry queue no longer need the heartbeat event to 
trigger
+    // And the progress report event can trigger the processor calculation 
because it's not reported
+    // yet
+    while (((PipeRealtimeEvent) pendingQueue.peekLast()).getEvent() instanceof 
PipeHeartbeatEvent) {
+      pendingQueue.pollLast();
+    }
     if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
       final ProgressReportEvent oldEvent = (ProgressReportEvent) 
pendingQueue.peekLast();
       oldEvent.bindProgressIndex(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
index 785e89cfb9a..43fa64c158e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
@@ -37,4 +37,8 @@ public class UnboundedBlockingPendingQueue<E extends Event> 
extends BlockingPend
   public E peekLast() {
     return pendingDeque.peekLast();
   }
+
+  public E pollLast() {
+    return pendingDeque.pollLast();
+  }
 }

Reply via email to