This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e86ec17bc95 Pipe: Add metrics for tsfile to tablets invocation call 
count and time (#15202)
e86ec17bc95 is described below

commit e86ec17bc95523e3c672dbf293717e4dcbdde710
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Mar 27 11:23:56 2025 +0800

    Pipe: Add metrics for tsfile to tablets invocation call count and time 
(#15202)
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  11 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   2 +
 .../tsfile/parser/TsFileInsertionEventParser.java  |  21 +++
 .../parser/TsFileInsertionEventParserProvider.java |  61 +++++++-
 .../query/TsFileInsertionEventQueryParser.java     |  19 ++-
 .../scan/TsFileInsertionEventScanParser.java       |  15 +-
 .../table/TsFileInsertionEventTableParser.java     |  16 +-
 .../dataregion/IoTDBDataRegionExtractor.java       |   2 +
 .../iotdb/db/pipe/metric/PipeDataNodeMetrics.java  |   3 +
 .../overview/PipeTsFileToTabletsMetrics.java       | 170 +++++++++++++++++++++
 .../iotdb/commons/service/metric/enums/Metric.java |   2 +
 11 files changed, 308 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 139c5cef37a..73606a04252 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -326,8 +327,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       return false;
     }
 
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-        .deregister(pipeName + "_" + creationTime);
+    final String taskId = pipeName + "_" + creationTime;
+    PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
+    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
 
     return true;
   }
@@ -343,8 +345,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
     if (Objects.nonNull(pipeMeta)) {
       final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .deregister(pipeName + "_" + creationTime);
+      final String taskId = pipeName + "_" + creationTime;
+      PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
+      
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
     }
 
     return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 357e11e2f39..04cf1d58e59 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -664,6 +664,8 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
       eventParser.compareAndSet(
           null,
           new TsFileInsertionEventParserProvider(
+                  pipeName,
+                  creationTime,
                   tsFile,
                   treePattern,
                   tablePattern,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index a8b3a7f33c5..4d38e196e27 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -39,6 +40,9 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionEventParser.class);
 
+  protected final String pipeName;
+  protected final long creationTime;
+
   protected final TreePattern treePattern; // used to filter data
   protected final TablePattern tablePattern; // used to filter data
   protected final GlobalTimeExpression timeFilterExpression; // used to filter 
data
@@ -48,17 +52,24 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
   protected final PipeTaskMeta pipeTaskMeta; // used to report progress
   protected final PipeInsertionEvent sourceEvent; // used to report progress
 
+  protected final long initialTimeNano = System.nanoTime();
+
   protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
 
   protected TsFileSequenceReader tsFileSequenceReader;
 
   protected TsFileInsertionEventParser(
+      final String pipeName,
+      final long creationTime,
       final TreePattern treePattern,
       final TablePattern tablePattern,
       final long startTime,
       final long endTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipeInsertionEvent sourceEvent) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+
     this.treePattern = treePattern;
     this.tablePattern = tablePattern;
     timeFilterExpression =
@@ -83,6 +94,16 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
 
   @Override
   public void close() {
+    try {
+      if (pipeName != null) {
+        PipeTsFileToTabletsMetrics.getInstance()
+            .recordTsFileToTabletTime(
+                pipeName + "_" + creationTime, System.nanoTime() - 
initialTimeNano);
+      }
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to report time usage for parsing tsfile for pipe 
{}", pipeName, e);
+    }
+
     try {
       if (tsFileSequenceReader != null) {
         tsFileSequenceReader.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
index 200130f6e14..1dee3f79df9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResource;
 
@@ -41,6 +42,9 @@ import java.util.stream.Collectors;
 
 public class TsFileInsertionEventParserProvider {
 
+  private final String pipeName;
+  private final long creationTime;
+
   private final File tsFile;
   private final TreePattern treePattern;
   private final TablePattern tablePattern;
@@ -52,6 +56,8 @@ public class TsFileInsertionEventParserProvider {
   private final String userName;
 
   public TsFileInsertionEventParserProvider(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final TreePattern treePattern,
       final TablePattern tablePattern,
@@ -60,6 +66,8 @@ public class TsFileInsertionEventParserProvider {
       final PipeTaskMeta pipeTaskMeta,
       final String userName,
       final PipeTsFileInsertionEvent sourceEvent) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
     this.tsFile = tsFile;
     this.treePattern = treePattern;
     this.tablePattern = tablePattern;
@@ -71,9 +79,22 @@ public class TsFileInsertionEventParserProvider {
   }
 
   public TsFileInsertionEventParser provide() throws IOException {
+    if (pipeName != null) {
+      PipeTsFileToTabletsMetrics.getInstance()
+          .markTsFileToTabletInvocation(pipeName + "_" + creationTime);
+    }
+
     if (sourceEvent.isTableModelEvent()) {
       return new TsFileInsertionEventTableParser(
-          tsFile, tablePattern, startTime, endTime, pipeTaskMeta, userName, 
sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          tablePattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          userName,
+          sourceEvent);
     }
 
     // Use scan container to save memory
@@ -81,7 +102,14 @@ public class TsFileInsertionEventParserProvider {
             / PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
         > PipeTsFileResource.MEMORY_SUFFICIENT_THRESHOLD) {
       return new TsFileInsertionEventScanParser(
-          tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          treePattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          sourceEvent);
     }
 
     if (treePattern instanceof IoTDBTreePattern
@@ -93,7 +121,14 @@ public class TsFileInsertionEventParserProvider {
       // hard to know whether it only matches one timeseries, while matching 
multiple is often the
       // case.
       return new TsFileInsertionEventQueryParser(
-          tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          treePattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          sourceEvent);
     }
 
     final Map<IDeviceID, Boolean> deviceIsAlignedMap =
@@ -102,7 +137,14 @@ public class TsFileInsertionEventParserProvider {
       // If we failed to get from cache, it indicates that the memory usage is 
high.
       // We use scan data container because it requires less memory.
       return new TsFileInsertionEventScanParser(
-          tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          treePattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          sourceEvent);
     }
 
     final int originalSize = deviceIsAlignedMap.size();
@@ -112,8 +154,17 @@ public class TsFileInsertionEventParserProvider {
     return (double) filteredDeviceIsAlignedMap.size() / originalSize
             > PipeConfig.getInstance().getPipeTsFileScanParsingThreshold()
         ? new TsFileInsertionEventScanParser(
-            tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent)
+            pipeName,
+            creationTime,
+            tsFile,
+            treePattern,
+            startTime,
+            endTime,
+            pipeTaskMeta,
+            sourceEvent)
         : new TsFileInsertionEventQueryParser(
+            pipeName,
+            creationTime,
             tsFile,
             treePattern,
             startTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
index 52a2b054c35..ba07299d66b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
@@ -75,10 +75,12 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
       final long endTime,
       final PipeInsertionEvent sourceEvent)
       throws IOException {
-    this(tsFile, pattern, startTime, endTime, null, sourceEvent);
+    this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent);
   }
 
   public TsFileInsertionEventQueryParser(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final TreePattern pattern,
       final long startTime,
@@ -86,10 +88,21 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
       final PipeTaskMeta pipeTaskMeta,
       final PipeInsertionEvent sourceEvent)
       throws IOException {
-    this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null);
+    this(
+        pipeName,
+        creationTime,
+        tsFile,
+        pattern,
+        startTime,
+        endTime,
+        pipeTaskMeta,
+        sourceEvent,
+        null);
   }
 
   public TsFileInsertionEventQueryParser(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final TreePattern pattern,
       final long startTime,
@@ -98,7 +111,7 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
       final PipeInsertionEvent sourceEvent,
       final Map<IDeviceID, Boolean> deviceIsAlignedMap)
       throws IOException {
-    super(pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent);
+    super(pipeName, creationTime, pattern, null, startTime, endTime, 
pipeTaskMeta, sourceEvent);
 
     try {
       final PipeTsFileResourceManager tsFileResourceManager = 
PipeDataNodeResourceManager.tsfile();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 54bdbc174a8..1559d472e04 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -87,6 +87,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
   private byte lastMarker = Byte.MIN_VALUE;
 
   public TsFileInsertionEventScanParser(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final TreePattern pattern,
       final long startTime,
@@ -94,7 +96,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
       final PipeTaskMeta pipeTaskMeta,
       final PipeInsertionEvent sourceEvent)
       throws IOException {
-    super(pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent);
+    super(pipeName, creationTime, pattern, null, startTime, endTime, 
pipeTaskMeta, sourceEvent);
 
     this.startTime = startTime;
     this.endTime = endTime;
@@ -115,6 +117,17 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     }
   }
 
+  public TsFileInsertionEventScanParser(
+      final File tsFile,
+      final TreePattern pattern,
+      final long startTime,
+      final long endTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipeInsertionEvent sourceEvent)
+      throws IOException {
+    this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, 
sourceEvent);
+  }
+
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
     return () ->
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index b363a163063..8011f9c3830 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -53,6 +53,8 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
   private final PipeMemoryBlock allocatedMemoryBlockForTableSchemas;
 
   public TsFileInsertionEventTableParser(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final TablePattern pattern,
       final long startTime,
@@ -61,7 +63,7 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
       final String userName,
       final PipeInsertionEvent sourceEvent)
       throws IOException {
-    super(null, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+    super(pipeName, creationTime, null, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent);
 
     try {
       this.allocatedMemoryBlockForChunk =
@@ -85,6 +87,18 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
     }
   }
 
+  public TsFileInsertionEventTableParser(
+      final File tsFile,
+      final TablePattern pattern,
+      final long startTime,
+      final long endTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final String userName,
+      final PipeInsertionEvent sourceEvent)
+      throws IOException {
+    this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, userName, 
sourceEvent);
+  }
+
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
     return () ->
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index b8e5bec509a..8396cf2c581 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
@@ -567,6 +568,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
     // register metric after generating taskID
     PipeDataRegionExtractorMetrics.getInstance().register(this);
+    PipeTsFileToTabletsMetrics.getInstance().register(this);
     PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
index 004b1bccf77..3f03ce580fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
 import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
 import org.apache.iotdb.db.pipe.metric.receiver.PipeDataNodeReceiverMetrics;
@@ -54,6 +55,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
     PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService);
     
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService);
     PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService);
+    PipeTsFileToTabletsMetrics.getInstance().bindTo(metricService);
   }
 
   @Override
@@ -71,6 +73,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
     PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService);
     
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService);
     PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService);
+    PipeTsFileToTabletsMetrics.getInstance().unbindFrom(metricService);
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
new file mode 100644
index 00000000000..04015fe6f82
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric.overview;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+public class PipeTsFileToTabletsMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileToTabletsMetrics.class);
+
+  @SuppressWarnings("java:S3077")
+  private volatile AbstractMetricService metricService;
+
+  private final ConcurrentSkipListSet<String> pipe = new 
ConcurrentSkipListSet<>();
+  private final Map<String, Timer> pipeTimerMap = new ConcurrentHashMap<>();
+  private final Map<String, Rate> pipeRateMap = new ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(final AbstractMetricService metricService) {
+    this.metricService = metricService;
+    ImmutableSet.copyOf(pipe).forEach(this::createMetrics);
+  }
+
+  private void createMetrics(final String pipeID) {
+    pipeTimerMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateTimer(
+            Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
+    pipeRateMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
+  }
+
+  @Override
+  public void unbindFrom(final AbstractMetricService metricService) {
+    ImmutableSet.copyOf(pipe).forEach(this::deregister);
+    if (!pipe.isEmpty()) {
+      LOGGER.warn(
+          "Failed to unbind from pipe tsfile to tablets metrics, pipe map is 
not empty, pipe: {}",
+          pipe);
+    }
+  }
+
+  private void removeMetrics(final String pipeID) {
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeTimerMap.remove(pipeID);
+
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeRateMap.remove(pipeID);
+  }
+
+  //////////////////////////// register & deregister 
////////////////////////////
+
+  public void register(final IoTDBDataRegionExtractor extractor) {
+    final String pipeID = extractor.getPipeName() + "_" + 
extractor.getCreationTime();
+    pipe.add(pipeID);
+    if (Objects.nonNull(metricService)) {
+      createMetrics(pipeID);
+    }
+  }
+
+  public void deregister(final String pipeID) {
+    if (!pipe.contains(pipeID)) {
+      LOGGER.warn(
+          "Failed to deregister pipe tsfile to tablets metrics, pipeID({}) 
does not exist", pipeID);
+      return;
+    }
+    try {
+      if (Objects.nonNull(metricService)) {
+        removeMetrics(pipeID);
+      }
+    } finally {
+      pipe.remove(pipeID);
+    }
+  }
+
+  //////////////////////////// pipe integration ////////////////////////////
+
+  public void markTsFileToTabletInvocation(final String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Rate rate = pipeRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.info(
+          "Failed to mark pipe tsfile to tablets invocation, pipeID({}) does 
not exist", taskID);
+      return;
+    }
+    rate.mark();
+  }
+
+  public void recordTsFileToTabletTime(final String taskID, long 
costTimeInNanos) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Timer timer = pipeTimerMap.get(taskID);
+    if (timer == null) {
+      LOGGER.info(
+          "Failed to record pipe tsfile to tablets time, pipeID({}) does not 
exist", taskID);
+      return;
+    }
+    timer.updateNanos(costTimeInNanos);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class Holder {
+
+    private static final PipeTsFileToTabletsMetrics INSTANCE = new 
PipeTsFileToTabletsMetrics();
+
+    private Holder() {
+      // Empty constructor
+    }
+  }
+
+  public static PipeTsFileToTabletsMetrics getInstance() {
+    return Holder.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 554f7d6226b..755ad8187df 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -177,6 +177,8 @@ public enum Metric {
   PIPE_CONFIGNODE_REMAINING_TIME("pipe_confignode_remaining_time"),
   PIPE_GLOBAL_REMAINING_EVENT_COUNT("pipe_global_remaining_event_count"),
   PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
+  PIPE_TSFILE_TO_TABLETS_TIME("pipe_tsfile_to_tablets_time"),
+  PIPE_TSFILE_TO_TABLETS_RATE("pipe_tsfile_to_tablets_rate"),
   // subscription related
   SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"),
   SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),

Reply via email to