This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch tsfile-2-tablet-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4d6408c5ebdaf8e20b6828cc098e14d0fac990c0 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Mar 26 20:50:11 2025 +0800 impl --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 11 +- .../tsfile/parser/TsFileInsertionEventParser.java | 4 +- .../parser/TsFileInsertionEventParserProvider.java | 6 + .../dataregion/IoTDBDataRegionExtractor.java | 2 + .../overview/PipeTsFileToTabletsMetrics.java | 170 +++++++++++++++++++++ .../iotdb/commons/service/metric/enums/Metric.java | 2 + 6 files changed, 190 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 139c5cef37a..73606a04252 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -48,6 +48,7 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -326,8 +327,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { return false; } - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .deregister(pipeName + "_" + creationTime); + final String taskId = pipeName + "_" + creationTime; + PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); + PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId); return true; } @@ -343,8 +345,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { if (Objects.nonNull(pipeMeta)) { final long creationTime = pipeMeta.getStaticMeta().getCreationTime(); - PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .deregister(pipeName + "_" + creationTime); + final String taskId = pipeName + "_" + creationTime; + PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); + PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId); } return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index da574de46f6..262bb7d815c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; +import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -95,7 +96,8 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { public void close() { try { if (pipeName != null) { - // report time usage + PipeTsFileToTabletsMetrics.getInstance() + .recordTsFileToTabletTime(pipeName, System.nanoTime() - initialTimeNano); } } catch (final Exception e) { LOGGER.warn("Failed to report time usage for parsing tsfile for pipe {}", pipeName, e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java index 4cf6af7f4db..1dee3f79df9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; +import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResource; @@ -78,6 +79,11 @@ public class TsFileInsertionEventParserProvider { } public TsFileInsertionEventParser provide() throws IOException { + if (pipeName != null) { + PipeTsFileToTabletsMetrics.getInstance() + .markTsFileToTabletInvocation(pipeName + "_" + creationTime); + } + if (sourceEvent.isTableModelEvent()) { return new TsFileInsertionEventTableParser( pipeName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index b8e5bec509a..8396cf2c581 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; @@ -567,6 +568,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { // register metric after generating taskID PipeDataRegionExtractorMetrics.getInstance().register(this); + PipeTsFileToTabletsMetrics.getInstance().register(this); PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java new file mode 100644 index 00000000000..04015fe6f82 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.metric.overview; + +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Rate; +import org.apache.iotdb.metrics.type.Timer; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; + +public class PipeTsFileToTabletsMetrics implements IMetricSet { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileToTabletsMetrics.class); + + @SuppressWarnings("java:S3077") + private volatile AbstractMetricService metricService; + + private final ConcurrentSkipListSet<String> pipe = new ConcurrentSkipListSet<>(); + private final Map<String, Timer> pipeTimerMap = new ConcurrentHashMap<>(); + private final Map<String, Rate> pipeRateMap = new ConcurrentHashMap<>(); + + //////////////////////////// bindTo & unbindFrom (metric framework) //////////////////////////// + + @Override + public void bindTo(final AbstractMetricService metricService) { + this.metricService = metricService; + ImmutableSet.copyOf(pipe).forEach(this::createMetrics); + } + + private void createMetrics(final String pipeID) { + pipeTimerMap.putIfAbsent( + pipeID, + metricService.getOrCreateTimer( + Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + pipeID)); + pipeRateMap.putIfAbsent( + pipeID, + metricService.getOrCreateRate( + Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + pipeID)); + } + + @Override + public void unbindFrom(final AbstractMetricService metricService) { + ImmutableSet.copyOf(pipe).forEach(this::deregister); + if (!pipe.isEmpty()) { + LOGGER.warn( + "Failed to unbind from pipe tsfile to tablets metrics, pipe map is not empty, pipe: {}", + pipe); + } + } + + private void removeMetrics(final String pipeID) { + metricService.remove( + MetricType.TIMER, + Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(), + Tag.NAME.toString(), + pipeID); + pipeTimerMap.remove(pipeID); + + metricService.remove( + MetricType.RATE, + Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(), + Tag.NAME.toString(), + pipeID); + pipeRateMap.remove(pipeID); + } + + //////////////////////////// register & deregister //////////////////////////// + + public void register(final IoTDBDataRegionExtractor extractor) { + final String pipeID = extractor.getPipeName() + "_" + extractor.getCreationTime(); + pipe.add(pipeID); + if (Objects.nonNull(metricService)) { + createMetrics(pipeID); + } + } + + public void deregister(final String pipeID) { + if (!pipe.contains(pipeID)) { + LOGGER.warn( + "Failed to deregister pipe tsfile to tablets metrics, pipeID({}) does not exist", pipeID); + return; + } + try { + if (Objects.nonNull(metricService)) { + removeMetrics(pipeID); + } + } finally { + pipe.remove(pipeID); + } + } + + //////////////////////////// pipe integration //////////////////////////// + + public void markTsFileToTabletInvocation(final String taskID) { + if (Objects.isNull(metricService)) { + return; + } + final Rate rate = pipeRateMap.get(taskID); + if (rate == null) { + LOGGER.info( + "Failed to mark pipe tsfile to tablets invocation, pipeID({}) does not exist", taskID); + return; + } + rate.mark(); + } + + public void recordTsFileToTabletTime(final String taskID, long costTimeInNanos) { + if (Objects.isNull(metricService)) { + return; + } + final Timer timer = pipeTimerMap.get(taskID); + if (timer == null) { + LOGGER.info( + "Failed to record pipe tsfile to tablets time, pipeID({}) does not exist", taskID); + return; + } + timer.updateNanos(costTimeInNanos); + } + + //////////////////////////// singleton //////////////////////////// + + private static class Holder { + + private static final PipeTsFileToTabletsMetrics INSTANCE = new PipeTsFileToTabletsMetrics(); + + private Holder() { + // Empty constructor + } + } + + public static PipeTsFileToTabletsMetrics getInstance() { + return Holder.INSTANCE; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 554f7d6226b..755ad8187df 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -177,6 +177,8 @@ public enum Metric { PIPE_CONFIGNODE_REMAINING_TIME("pipe_confignode_remaining_time"), PIPE_GLOBAL_REMAINING_EVENT_COUNT("pipe_global_remaining_event_count"), PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"), + PIPE_TSFILE_TO_TABLETS_TIME("pipe_tsfile_to_tablets_time"), + PIPE_TSFILE_TO_TABLETS_RATE("pipe_tsfile_to_tablets_rate"), // subscription related SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"), SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),
