This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 01dcf32fbf0 [To dev/1.3] Pipe: Delete the heartbeat event count in
Remaining Count #16115 (#16116)
01dcf32fbf0 is described below
commit 01dcf32fbf0e9a72164f9525fbf28a66c5cf46d3
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Aug 7 14:41:31 2025 +0800
[To dev/1.3] Pipe: Delete the heartbeat event count in Remaining Count
#16115 (#16116)
* Pipe: Delete the heartbeat event count in Remaining Count
* update
* delete getRemainingEvents function
---
.../PipeDataNodeRemainingEventAndTimeOperator.java | 17 -----------------
.../metric/overview/PipeDataNodeSinglePipeMetrics.java | 4 ++--
2 files changed, 2 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index db19cc477d8..f54e8bcfccb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -120,23 +120,6 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
return insertNodeEventCount.get();
}
- long getRemainingEvents() {
- final long remainingEvents =
- tsfileEventCount.get()
- + rawTabletEventCount.get()
- + insertNodeEventCount.get()
- + heartbeatEventCount.get()
- + schemaRegionExtractors.stream()
- .map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
- .reduce(Long::sum)
- .orElse(0L);
-
- // There are cases where the indicator is negative. For example, after the
Pipe is restarted,
- // the Processor SubTask is still collecting Events, resulting in a
negative count. This
- // situation cannot be avoided because the Pipe may be restarted
internally.
- return remainingEvents >= 0 ? remainingEvents : 0;
- }
-
/**
* This will calculate the estimated remaining time of pipe.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index f52b28ba2d7..7ff3c428f6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -89,7 +89,7 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
Metric.PIPE_DATANODE_REMAINING_EVENT_COUNT.toString(),
MetricLevel.IMPORTANT,
operator,
- PipeDataNodeRemainingEventAndTimeOperator::getRemainingEvents,
+
PipeDataNodeRemainingEventAndTimeOperator::getRemainingNonHeartbeatEvents,
Tag.NAME.toString(),
operator.getPipeName(),
Tag.CREATION_TIME.toString(),
@@ -403,7 +403,7 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
remainingEventAndTimeOperatorMap.computeIfAbsent(
pipeName + "_" + creationTime,
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
- return new Pair<>(operator.getRemainingEvents(),
operator.getRemainingTime());
+ return new Pair<>(operator.getRemainingNonHeartbeatEvents(),
operator.getRemainingTime());
}
//////////////////////////// singleton ////////////////////////////