This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch tsfile-2-tablet-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4d6408c5ebdaf8e20b6828cc098e14d0fac990c0
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Mar 26 20:50:11 2025 +0800

    impl
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  11 +-
 .../tsfile/parser/TsFileInsertionEventParser.java  |   4 +-
 .../parser/TsFileInsertionEventParserProvider.java |   6 +
 .../dataregion/IoTDBDataRegionExtractor.java       |   2 +
 .../overview/PipeTsFileToTabletsMetrics.java       | 170 +++++++++++++++++++++
 .../iotdb/commons/service/metric/enums/Metric.java |   2 +
 6 files changed, 190 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 139c5cef37a..73606a04252 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -326,8 +327,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       return false;
     }
 
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-        .deregister(pipeName + "_" + creationTime);
+    final String taskId = pipeName + "_" + creationTime;
+    PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
+    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
 
     return true;
   }
@@ -343,8 +345,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
     if (Objects.nonNull(pipeMeta)) {
       final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .deregister(pipeName + "_" + creationTime);
+      final String taskId = pipeName + "_" + creationTime;
+      PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
+      
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
     }
 
     return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index da574de46f6..262bb7d815c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -95,7 +96,8 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
   public void close() {
     try {
       if (pipeName != null) {
-        // report time usage
+        PipeTsFileToTabletsMetrics.getInstance()
+            .recordTsFileToTabletTime(pipeName, System.nanoTime() - 
initialTimeNano);
       }
     } catch (final Exception e) {
       LOGGER.warn("Failed to report time usage for parsing tsfile for pipe 
{}", pipeName, e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
index 4cf6af7f4db..1dee3f79df9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResource;
 
@@ -78,6 +79,11 @@ public class TsFileInsertionEventParserProvider {
   }
 
   public TsFileInsertionEventParser provide() throws IOException {
+    if (pipeName != null) {
+      PipeTsFileToTabletsMetrics.getInstance()
+          .markTsFileToTabletInvocation(pipeName + "_" + creationTime);
+    }
+
     if (sourceEvent.isTableModelEvent()) {
       return new TsFileInsertionEventTableParser(
           pipeName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index b8e5bec509a..8396cf2c581 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
@@ -567,6 +568,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
     // register metric after generating taskID
     PipeDataRegionExtractorMetrics.getInstance().register(this);
+    PipeTsFileToTabletsMetrics.getInstance().register(this);
     PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
new file mode 100644
index 00000000000..04015fe6f82
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric.overview;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+public class PipeTsFileToTabletsMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileToTabletsMetrics.class);
+
+  @SuppressWarnings("java:S3077")
+  private volatile AbstractMetricService metricService;
+
+  private final ConcurrentSkipListSet<String> pipe = new 
ConcurrentSkipListSet<>();
+  private final Map<String, Timer> pipeTimerMap = new ConcurrentHashMap<>();
+  private final Map<String, Rate> pipeRateMap = new ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(final AbstractMetricService metricService) {
+    this.metricService = metricService;
+    ImmutableSet.copyOf(pipe).forEach(this::createMetrics);
+  }
+
+  private void createMetrics(final String pipeID) {
+    pipeTimerMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateTimer(
+            Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
+    pipeRateMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
+  }
+
+  @Override
+  public void unbindFrom(final AbstractMetricService metricService) {
+    ImmutableSet.copyOf(pipe).forEach(this::deregister);
+    if (!pipe.isEmpty()) {
+      LOGGER.warn(
+          "Failed to unbind from pipe tsfile to tablets metrics, pipe map is 
not empty, pipe: {}",
+          pipe);
+    }
+  }
+
+  private void removeMetrics(final String pipeID) {
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeTimerMap.remove(pipeID);
+
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeRateMap.remove(pipeID);
+  }
+
+  //////////////////////////// register & deregister 
////////////////////////////
+
+  public void register(final IoTDBDataRegionExtractor extractor) {
+    final String pipeID = extractor.getPipeName() + "_" + 
extractor.getCreationTime();
+    pipe.add(pipeID);
+    if (Objects.nonNull(metricService)) {
+      createMetrics(pipeID);
+    }
+  }
+
+  public void deregister(final String pipeID) {
+    if (!pipe.contains(pipeID)) {
+      LOGGER.warn(
+          "Failed to deregister pipe tsfile to tablets metrics, pipeID({}) 
does not exist", pipeID);
+      return;
+    }
+    try {
+      if (Objects.nonNull(metricService)) {
+        removeMetrics(pipeID);
+      }
+    } finally {
+      pipe.remove(pipeID);
+    }
+  }
+
+  //////////////////////////// pipe integration ////////////////////////////
+
+  public void markTsFileToTabletInvocation(final String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Rate rate = pipeRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.info(
+          "Failed to mark pipe tsfile to tablets invocation, pipeID({}) does 
not exist", taskID);
+      return;
+    }
+    rate.mark();
+  }
+
+  public void recordTsFileToTabletTime(final String taskID, long 
costTimeInNanos) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Timer timer = pipeTimerMap.get(taskID);
+    if (timer == null) {
+      LOGGER.info(
+          "Failed to record pipe tsfile to tablets time, pipeID({}) does not 
exist", taskID);
+      return;
+    }
+    timer.updateNanos(costTimeInNanos);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class Holder {
+
+    private static final PipeTsFileToTabletsMetrics INSTANCE = new 
PipeTsFileToTabletsMetrics();
+
+    private Holder() {
+      // Empty constructor
+    }
+  }
+
+  public static PipeTsFileToTabletsMetrics getInstance() {
+    return Holder.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 554f7d6226b..755ad8187df 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -177,6 +177,8 @@ public enum Metric {
   PIPE_CONFIGNODE_REMAINING_TIME("pipe_confignode_remaining_time"),
   PIPE_GLOBAL_REMAINING_EVENT_COUNT("pipe_global_remaining_event_count"),
   PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
+  PIPE_TSFILE_TO_TABLETS_TIME("pipe_tsfile_to_tablets_time"),
+  PIPE_TSFILE_TO_TABLETS_RATE("pipe_tsfile_to_tablets_rate"),
   // subscription related
   SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"),
   SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),

Reply via email to