This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 47e02c889e7 Pipe: Fix lost transfer time metric & Support transfer
time tracking for RawTablet events converted from InsertNode/TsFile events
(#16364)
47e02c889e7 is described below
commit 47e02c889e735bf5fa243b67d79d78cc54f44b39
Author: nanxiang xia <[email protected]>
AuthorDate: Mon Sep 8 18:50:49 2025 +0800
Pipe: Fix lost transfer time metric & Support transfer time tracking for
RawTablet events converted from InsertNode/TsFile events (#16364)
* fix metric
* fix
* fix
---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 4 +++
.../common/tablet/PipeRawTabletInsertionEvent.java | 19 ++++++++++++
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 +++
.../overview/PipeDataNodeSinglePipeMetrics.java | 34 ++++++++++++++++++++++
4 files changed, 61 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 855f33b12e2..2006b4105c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -162,6 +162,10 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
: null;
}
+ public long getExtractTime() {
+ return extractTime;
+ }
+
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 322b5ea2435..37492ee21ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -263,6 +263,25 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
// Actually release the occupied memory.
tablet = null;
eventParser = null;
+
+ // Update metrics of the source event
+ if (needToReport && shouldReportOnCommit && Objects.nonNull(pipeName)) {
+ if (sourceEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ PipeDataNodeSinglePipeMetrics.getInstance()
+ .updateInsertNodeTransferTimer(
+ pipeName,
+ creationTime,
+ System.nanoTime()
+ - ((PipeInsertNodeTabletInsertionEvent)
sourceEvent).getExtractTime());
+ } else if (sourceEvent instanceof PipeTsFileInsertionEvent) {
+ PipeDataNodeSinglePipeMetrics.getInstance()
+ .updateTsFileTransferTimer(
+ pipeName,
+ creationTime,
+ System.nanoTime() - ((PipeTsFileInsertionEvent)
sourceEvent).getExtractTime());
+ }
+ }
+
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 454c9cf0063..0461b1f5359 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -298,6 +298,10 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
return resource.getTimePartition();
}
+ public long getExtractTime() {
+ return extractTime;
+ }
+
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 54705fc84cf..8c1ef90f97f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
public class PipeDataNodeSinglePipeMetrics implements IMetricSet {
@@ -236,7 +237,24 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
remainingEventAndTimeOperatorMap.computeIfAbsent(
pipeName + "_" + creationTime,
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
+
operator.decreaseInsertNodeEventCount();
+
+ if (transferTime > 0) {
+ operator.getInsertNodeTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
+ }
+ }
+
+ public void updateInsertNodeTransferTimer(
+ final String pipeName, final long creationTime, final long transferTime)
{
+ if (transferTime > 0) {
+ remainingEventAndTimeOperatorMap
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
+ .getInsertNodeTransferTimer()
+ .update(transferTime, TimeUnit.NANOSECONDS);
+ }
}
public void increaseRawTabletEventCount(final String pipeName, final long
creationTime) {
@@ -271,6 +289,22 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
operator.decreaseTsFileEventCount();
+
+ if (transferTime > 0) {
+ operator.getTsFileTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
+ }
+ }
+
+ public void updateTsFileTransferTimer(
+ final String pipeName, final long creationTime, final long transferTime)
{
+ if (transferTime > 0) {
+ remainingEventAndTimeOperatorMap
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
+ .getTsFileTransferTimer()
+ .update(transferTime, TimeUnit.NANOSECONDS);
+ }
}
public void increaseHeartbeatEventCount(final String pipeName, final long
creationTime) {