This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 98ca22d92f4 [To dev/1.3] Pipe: Fix lost transfer time metric & Support
transfer time tracking for RawTablet events converted from InsertNode/TsFile
events (#16364) (#16370)
98ca22d92f4 is described below
commit 98ca22d92f420f0a76781fb7a62315005ccc5f2c
Author: nanxiang xia <[email protected]>
AuthorDate: Tue Sep 9 09:42:42 2025 +0800
[To dev/1.3] Pipe: Fix lost transfer time metric & Support transfer time
tracking for RawTablet events converted from InsertNode/TsFile events (#16364)
(#16370)
* fix metric
* fix
* fix
---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 4 +++
.../common/tablet/PipeRawTabletInsertionEvent.java | 19 ++++++++++++
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 +++
.../overview/PipeDataNodeSinglePipeMetrics.java | 34 ++++++++++++++++++++++
4 files changed, 61 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 772d811292b..c8390a8baa1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -115,6 +115,10 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
: null;
}
+ public long getExtractTime() {
+ return extractTime;
+ }
+
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 5ae27d5eb3c..2443283c1c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -158,6 +158,25 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent
// Actually release the occupied memory.
tablet = null;
dataContainer = null;
+
+ // Update metrics of the source event
+ if (needToReport && shouldReportOnCommit && Objects.nonNull(pipeName)) {
+ if (sourceEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ PipeDataNodeSinglePipeMetrics.getInstance()
+ .updateInsertNodeTransferTimer(
+ pipeName,
+ creationTime,
+ System.nanoTime()
+ - ((PipeInsertNodeTabletInsertionEvent)
sourceEvent).getExtractTime());
+ } else if (sourceEvent instanceof PipeTsFileInsertionEvent) {
+ PipeDataNodeSinglePipeMetrics.getInstance()
+ .updateTsFileTransferTimer(
+ pipeName,
+ creationTime,
+ System.nanoTime() - ((PipeTsFileInsertionEvent)
sourceEvent).getExtractTime());
+ }
+ }
+
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index baa6ae9a3ba..311190f0181 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -247,6 +247,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
return resource.getTimePartition();
}
+ public long getExtractTime() {
+ return extractTime;
+ }
+
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 54705fc84cf..8c1ef90f97f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
public class PipeDataNodeSinglePipeMetrics implements IMetricSet {
@@ -236,7 +237,24 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
remainingEventAndTimeOperatorMap.computeIfAbsent(
pipeName + "_" + creationTime,
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
+
operator.decreaseInsertNodeEventCount();
+
+ if (transferTime > 0) {
+ operator.getInsertNodeTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
+ }
+ }
+
+ public void updateInsertNodeTransferTimer(
+ final String pipeName, final long creationTime, final long transferTime)
{
+ if (transferTime > 0) {
+ remainingEventAndTimeOperatorMap
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
+ .getInsertNodeTransferTimer()
+ .update(transferTime, TimeUnit.NANOSECONDS);
+ }
}
public void increaseRawTabletEventCount(final String pipeName, final long
creationTime) {
@@ -271,6 +289,22 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
operator.decreaseTsFileEventCount();
+
+ if (transferTime > 0) {
+ operator.getTsFileTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
+ }
+ }
+
+ public void updateTsFileTransferTimer(
+ final String pipeName, final long creationTime, final long transferTime)
{
+ if (transferTime > 0) {
+ remainingEventAndTimeOperatorMap
+ .computeIfAbsent(
+ pipeName + "_" + creationTime,
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
+ .getTsFileTransferTimer()
+ .update(transferTime, TimeUnit.NANOSECONDS);
+ }
}
public void increaseHeartbeatEventCount(final String pipeName, final long
creationTime) {