This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 47e02c889e7 Pipe: Fix lost transfer time metric & Support transfer 
time tracking for RawTablet events converted from InsertNode/TsFile events 
(#16364)
47e02c889e7 is described below

commit 47e02c889e735bf5fa243b67d79d78cc54f44b39
Author: nanxiang xia <[email protected]>
AuthorDate: Mon Sep 8 18:50:49 2025 +0800

    Pipe: Fix lost transfer time metric & Support transfer time tracking for 
RawTablet events converted from InsertNode/TsFile events (#16364)
    
    * fix metric
    
    * fix
    
    * fix
---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  4 +++
 .../common/tablet/PipeRawTabletInsertionEvent.java | 19 ++++++++++++
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 +++
 .../overview/PipeDataNodeSinglePipeMetrics.java    | 34 ++++++++++++++++++++++
 4 files changed, 61 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 855f33b12e2..2006b4105c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -162,6 +162,10 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
         : null;
   }
 
+  public long getExtractTime() {
+    return extractTime;
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 322b5ea2435..37492ee21ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -263,6 +263,25 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
     // Actually release the occupied memory.
     tablet = null;
     eventParser = null;
+
+    // Update metrics of the source event
+    if (needToReport && shouldReportOnCommit && Objects.nonNull(pipeName)) {
+      if (sourceEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+        PipeDataNodeSinglePipeMetrics.getInstance()
+            .updateInsertNodeTransferTimer(
+                pipeName,
+                creationTime,
+                System.nanoTime()
+                    - ((PipeInsertNodeTabletInsertionEvent) 
sourceEvent).getExtractTime());
+      } else if (sourceEvent instanceof PipeTsFileInsertionEvent) {
+        PipeDataNodeSinglePipeMetrics.getInstance()
+            .updateTsFileTransferTimer(
+                pipeName,
+                creationTime,
+                System.nanoTime() - ((PipeTsFileInsertionEvent) 
sourceEvent).getExtractTime());
+      }
+    }
+
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 454c9cf0063..0461b1f5359 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -298,6 +298,10 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
     return resource.getTimePartition();
   }
 
+  public long getExtractTime() {
+    return extractTime;
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 54705fc84cf..8c1ef90f97f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 public class PipeDataNodeSinglePipeMetrics implements IMetricSet {
 
@@ -236,7 +237,24 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
         remainingEventAndTimeOperatorMap.computeIfAbsent(
             pipeName + "_" + creationTime,
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
+
     operator.decreaseInsertNodeEventCount();
+
+    if (transferTime > 0) {
+      operator.getInsertNodeTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
+    }
+  }
+
+  public void updateInsertNodeTransferTimer(
+      final String pipeName, final long creationTime, final long transferTime) 
{
+    if (transferTime > 0) {
+      remainingEventAndTimeOperatorMap
+          .computeIfAbsent(
+              pipeName + "_" + creationTime,
+              k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
+          .getInsertNodeTransferTimer()
+          .update(transferTime, TimeUnit.NANOSECONDS);
+    }
   }
 
   public void increaseRawTabletEventCount(final String pipeName, final long 
creationTime) {
@@ -271,6 +289,22 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
 
     operator.decreaseTsFileEventCount();
+
+    if (transferTime > 0) {
+      operator.getTsFileTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
+    }
+  }
+
+  public void updateTsFileTransferTimer(
+      final String pipeName, final long creationTime, final long transferTime) 
{
+    if (transferTime > 0) {
+      remainingEventAndTimeOperatorMap
+          .computeIfAbsent(
+              pipeName + "_" + creationTime,
+              k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
+          .getTsFileTransferTimer()
+          .update(transferTime, TimeUnit.NANOSECONDS);
+    }
   }
 
   public void increaseHeartbeatEventCount(final String pipeName, final long 
creationTime) {

Reply via email to