This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 01dcf32fbf0 [To dev/1.3] Pipe: Delete the heartbeat event count in 
Remaining Count #16115 (#16116)
01dcf32fbf0 is described below

commit 01dcf32fbf0e9a72164f9525fbf28a66c5cf46d3
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Aug 7 14:41:31 2025 +0800

    [To dev/1.3] Pipe: Delete the heartbeat event count in Remaining Count 
#16115 (#16116)
    
    * Pipe: Delete the heartbeat event count in Remaining Count
    
    * update
    
    * delete getRemainingEvents function
---
 .../PipeDataNodeRemainingEventAndTimeOperator.java      | 17 -----------------
 .../metric/overview/PipeDataNodeSinglePipeMetrics.java  |  4 ++--
 2 files changed, 2 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index db19cc477d8..f54e8bcfccb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -120,23 +120,6 @@ public class PipeDataNodeRemainingEventAndTimeOperator 
extends PipeRemainingOper
     return insertNodeEventCount.get();
   }
 
-  long getRemainingEvents() {
-    final long remainingEvents =
-        tsfileEventCount.get()
-            + rawTabletEventCount.get()
-            + insertNodeEventCount.get()
-            + heartbeatEventCount.get()
-            + schemaRegionExtractors.stream()
-                .map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
-                .reduce(Long::sum)
-                .orElse(0L);
-
-    // There are cases where the indicator is negative. For example, after the 
Pipe is restarted,
-    // the Processor SubTask is still collecting Events, resulting in a 
negative count. This
-    // situation cannot be avoided because the Pipe may be restarted 
internally.
-    return remainingEvents >= 0 ? remainingEvents : 0;
-  }
-
   /**
    * This will calculate the estimated remaining time of pipe.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index f52b28ba2d7..7ff3c428f6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -89,7 +89,7 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
         Metric.PIPE_DATANODE_REMAINING_EVENT_COUNT.toString(),
         MetricLevel.IMPORTANT,
         operator,
-        PipeDataNodeRemainingEventAndTimeOperator::getRemainingEvents,
+        
PipeDataNodeRemainingEventAndTimeOperator::getRemainingNonHeartbeatEvents,
         Tag.NAME.toString(),
         operator.getPipeName(),
         Tag.CREATION_TIME.toString(),
@@ -403,7 +403,7 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
         remainingEventAndTimeOperatorMap.computeIfAbsent(
             pipeName + "_" + creationTime,
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
-    return new Pair<>(operator.getRemainingEvents(), 
operator.getRemainingTime());
+    return new Pair<>(operator.getRemainingNonHeartbeatEvents(), 
operator.getRemainingTime());
   }
 
   //////////////////////////// singleton ////////////////////////////

Reply via email to