This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 065d0d79152acd4f195fd9a1b83deb06240eb3ad Author: Zhenyu Luo <[email protected]> AuthorDate: Thu Aug 7 15:30:09 2025 +0800 Pipe: Delete the heartbeat event count in Remaining Count (#16115) * Pipe: Delete the heartbeat event count in Remaining Count * update * delete getRemainingEvents function (cherry picked from commit 275fe395aee81df6e6cbb984c56bba86a2fc6eb2) --- .../PipeDataNodeRemainingEventAndTimeOperator.java | 17 ----------------- .../metric/overview/PipeDataNodeSinglePipeMetrics.java | 4 ++-- 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java index db19cc477d8..f54e8bcfccb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java @@ -120,23 +120,6 @@ public class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOper return insertNodeEventCount.get(); } - long getRemainingEvents() { - final long remainingEvents = - tsfileEventCount.get() - + rawTabletEventCount.get() - + insertNodeEventCount.get() - + heartbeatEventCount.get() - + schemaRegionExtractors.stream() - .map(IoTDBSchemaRegionSource::getUnTransferredEventCount) - .reduce(Long::sum) - .orElse(0L); - - // There are cases where the indicator is negative. For example, after the Pipe is restarted, - // the Processor SubTask is still collecting Events, resulting in a negative count. This - // situation cannot be avoided because the Pipe may be restarted internally. - return remainingEvents >= 0 ? remainingEvents : 0; - } - /** * This will calculate the estimated remaining time of pipe. * diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java index f52b28ba2d7..7ff3c428f6e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java @@ -89,7 +89,7 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { Metric.PIPE_DATANODE_REMAINING_EVENT_COUNT.toString(), MetricLevel.IMPORTANT, operator, - PipeDataNodeRemainingEventAndTimeOperator::getRemainingEvents, + PipeDataNodeRemainingEventAndTimeOperator::getRemainingNonHeartbeatEvents, Tag.NAME.toString(), operator.getPipeName(), Tag.CREATION_TIME.toString(), @@ -403,7 +403,7 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { remainingEventAndTimeOperatorMap.computeIfAbsent( pipeName + "_" + creationTime, k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); - return new Pair<>(operator.getRemainingEvents(), operator.getRemainingTime()); + return new Pair<>(operator.getRemainingNonHeartbeatEvents(), operator.getRemainingTime()); } //////////////////////////// singleton ////////////////////////////
