This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f616e3441c0 Pipe: Exclude the tsFiles / insertNodes from transfer time
metric which have not be sent (#16015)
f616e3441c0 is described below
commit f616e3441c089ad1728593da7d2b263fc39dbff0
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 23 16:55:40 2025 +0800
Pipe: Exclude the tsFiles / insertNodes from transfer time metric which
have not be sent (#16015)
---
.../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 5 ++++-
.../pipe/event/common/tsfile/PipeTsFileInsertionEvent.java | 5 ++++-
.../pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java | 12 ++++++++----
.../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java | 6 +++---
4 files changed, 19 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 329a123c489..ffa4da79466 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -211,7 +211,10 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
PipeDataNodeAgent.task()
.decreaseFloatingMemoryUsageInByte(pipeName, creationTime,
ramBytesUsed());
PipeDataNodeSinglePipeMetrics.getInstance()
- .decreaseInsertNodeEventCount(pipeName, creationTime,
System.nanoTime() - extractTime);
+ .decreaseInsertNodeEventCount(
+ pipeName,
+ creationTime,
+ shouldReportOnCommit ? System.nanoTime() - extractTime : -1);
}
insertNode = null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d791dddc713..ce9e70a2b42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -337,7 +337,10 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeSinglePipeMetrics.getInstance()
- .decreaseTsFileEventCount(pipeName, creationTime,
System.nanoTime() - extractTime);
+ .decreaseTsFileEventCount(
+ pipeName,
+ creationTime,
+ shouldReportOnCommit ? System.nanoTime() - extractTime : -1);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 65efae61714..70b4561bb5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -269,8 +269,10 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
operator.decreaseInsertNodeEventCount();
- operator.getInsertNodeTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
- PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+ if (transferTime > 0) {
+ operator.getInsertNodeTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
+ PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+ }
}
public void increaseRawTabletEventCount(final String pipeName, final long
creationTime) {
@@ -305,8 +307,10 @@ public class PipeDataNodeSinglePipeMetrics implements
IMetricSet {
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
operator.decreaseTsFileEventCount();
- operator.getTsFileTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
- PIPE_DATANODE_TSFILE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+ if (transferTime > 0) {
+ operator.getTsFileTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
+ PIPE_DATANODE_TSFILE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
+ }
}
public void increaseHeartbeatEventCount(final String pipeName, final long
creationTime) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 34e3f80dcf5..c7e0345cc59 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -202,6 +202,9 @@ public abstract class EnrichedEvent implements Event {
}
if (referenceCount.get() == 1) {
+ if (!shouldReport) {
+ shouldReportOnCommit = false;
+ }
// We assume that this function will not throw any exceptions.
if (!internallyDecreaseResourceReferenceCount(holderMessage)) {
LOGGER.warn(
@@ -209,9 +212,6 @@ public abstract class EnrichedEvent implements Event {
coreReportMessage(),
Thread.currentThread().getStackTrace());
}
- if (!shouldReport) {
- shouldReportOnCommit = false;
- }
PipeEventCommitManager.getInstance().commit(this, committerKey);
}