This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f616e3441c0 Pipe: Exclude the tsFiles / insertNodes from transfer time 
metric which have not be sent (#16015)
f616e3441c0 is described below

commit f616e3441c089ad1728593da7d2b263fc39dbff0
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 23 16:55:40 2025 +0800

    Pipe: Exclude the tsFiles / insertNodes from transfer time metric which 
have not be sent (#16015)
---
 .../common/tablet/PipeInsertNodeTabletInsertionEvent.java    |  5 ++++-
 .../pipe/event/common/tsfile/PipeTsFileInsertionEvent.java   |  5 ++++-
 .../pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java  | 12 ++++++++----
 .../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java   |  6 +++---
 4 files changed, 19 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 329a123c489..ffa4da79466 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -211,7 +211,10 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
         PipeDataNodeAgent.task()
             .decreaseFloatingMemoryUsageInByte(pipeName, creationTime, 
ramBytesUsed());
         PipeDataNodeSinglePipeMetrics.getInstance()
-            .decreaseInsertNodeEventCount(pipeName, creationTime, 
System.nanoTime() - extractTime);
+            .decreaseInsertNodeEventCount(
+                pipeName,
+                creationTime,
+                shouldReportOnCommit ? System.nanoTime() - extractTime : -1);
       }
       insertNode = null;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d791dddc713..ce9e70a2b42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -337,7 +337,10 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
     } finally {
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeSinglePipeMetrics.getInstance()
-            .decreaseTsFileEventCount(pipeName, creationTime, 
System.nanoTime() - extractTime);
+            .decreaseTsFileEventCount(
+                pipeName,
+                creationTime,
+                shouldReportOnCommit ? System.nanoTime() - extractTime : -1);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 65efae61714..70b4561bb5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -269,8 +269,10 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
     operator.decreaseInsertNodeEventCount();
 
-    operator.getInsertNodeTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
-    PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+    if (transferTime > 0) {
+      operator.getInsertNodeTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
+      PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+    }
   }
 
   public void increaseRawTabletEventCount(final String pipeName, final long 
creationTime) {
@@ -305,8 +307,10 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
 
     operator.decreaseTsFileEventCount();
-    operator.getTsFileTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
-    PIPE_DATANODE_TSFILE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+    if (transferTime > 0) {
+      operator.getTsFileTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
+      PIPE_DATANODE_TSFILE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+    }
   }
 
   public void increaseHeartbeatEventCount(final String pipeName, final long 
creationTime) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 34e3f80dcf5..c7e0345cc59 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -202,6 +202,9 @@ public abstract class EnrichedEvent implements Event {
     }
 
     if (referenceCount.get() == 1) {
+      if (!shouldReport) {
+        shouldReportOnCommit = false;
+      }
       // We assume that this function will not throw any exceptions.
       if (!internallyDecreaseResourceReferenceCount(holderMessage)) {
         LOGGER.warn(
@@ -209,9 +212,6 @@ public abstract class EnrichedEvent implements Event {
             coreReportMessage(),
             Thread.currentThread().getStackTrace());
       }
-      if (!shouldReport) {
-        shouldReportOnCommit = false;
-      }
       PipeEventCommitManager.getInstance().commit(this, committerKey);
     }
 

Reply via email to