This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch ts-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ts-fix by this push:
new 8e4bb49de10 fix
8e4bb49de10 is described below
commit 8e4bb49de10bb04e6a1429579107b87508bc6bcf
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 8 19:02:19 2026 +0800
fix
---
.../source/dataregion/realtime/PipeRealtimeDataRegionSource.java | 7 +++++++
.../pipe/agent/task/connection/UnboundedBlockingPendingQueue.java | 4 ++++
2 files changed, 11 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 1e47a48e500..718529243d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -427,6 +427,13 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
}
protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
+ // Remove any heartbeat events in front of this event to avoid OOM
+ // Since the batch and retry queue no longer need the heartbeat event to
trigger
+ // And the progress report event can trigger the processor calculation
because it's not reported
+ // yet
+ while (((PipeRealtimeEvent) pendingQueue.peekLast()).getEvent() instanceof
PipeHeartbeatEvent) {
+ pendingQueue.pollLast();
+ }
if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
final ProgressReportEvent oldEvent = (ProgressReportEvent)
pendingQueue.peekLast();
oldEvent.bindProgressIndex(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
index 785e89cfb9a..43fa64c158e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
@@ -37,4 +37,8 @@ public class UnboundedBlockingPendingQueue<E extends Event>
extends BlockingPend
public E peekLast() {
return pendingDeque.peekLast();
}
+
+ public E pollLast() {
+ return pendingDeque.pollLast();
+ }
}