This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch add_flush_metric in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 76e2b12cacb27d8f03fbb254677e6e0808d9bd7b Author: HTHou <[email protected]> AuthorDate: Tue Jun 25 12:04:40 2024 +0800 Add flush metric --- .../iotdb/db/service/metrics/WritingMetrics.java | 60 ++++++++++++++++++++++ .../iotdb/db/storageengine/StorageEngine.java | 4 ++ .../db/storageengine/dataregion/DataRegion.java | 14 +++++ .../iotdb/commons/service/metric/enums/Metric.java | 2 + 4 files changed, 80 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java index efe2dacda2d..bf6ce7d0bd3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java @@ -345,6 +345,8 @@ public class WritingMetrics implements IMetricSet { allDataRegions.forEach(this::createDataRegionMemoryCostMetrics); allDataRegionIds.forEach(this::createFlushingMemTableStatusMetrics); allDataRegionIds.forEach(this::createSeriesFullFlushMemTableCounterMetrics); + allDataRegionIds.forEach(this::createManualFlushMemTableCounterMetrics); + allDataRegionIds.forEach(this::createMemControlFlushMemTableCounterMetrics); allDataRegionIds.forEach(this::createTimedFlushMemTableCounterMetrics); allDataRegionIds.forEach(this::createWalFlushMemTableCounterMetrics); allDataRegionIds.forEach(this::createActiveMemtableCounterMetrics); @@ -371,6 +373,8 @@ public class WritingMetrics implements IMetricSet { removeSeriesFullFlushMemTableCounterMetrics(dataRegionId); removeTimedFlushMemTableCounterMetrics(dataRegionId); removeWalFlushMemTableCounterMetrics(dataRegionId); + removeManualFlushMemTableCounterMetrics(dataRegionId); + removeMemControlFlushMemTableCounterMetrics(dataRegionId); removeActiveMemtableCounterMetrics(dataRegionId); }); removeActiveTimePartitionCounterMetrics(); @@ -481,6 +485,24 @@ public class WritingMetrics implements IMetricSet { dataRegionId.toString()); } + public void createManualFlushMemTableCounterMetrics(DataRegionId dataRegionId) { + MetricService.getInstance() + .getOrCreateCounter( + Metric.MANUAL_FLUSH_MEMTABLE_COUNT.toString(), + MetricLevel.IMPORTANT, + Tag.REGION.toString(), + dataRegionId.toString()); + } + + public void createMemControlFlushMemTableCounterMetrics(DataRegionId dataRegionId) { + MetricService.getInstance() + .getOrCreateCounter( + Metric.MEM_CONTROL_FLUSH_MEMTABLE_COUNT.toString(), + MetricLevel.IMPORTANT, + Tag.REGION.toString(), + dataRegionId.toString()); + } + public void createActiveMemtableCounterMetrics(DataRegionId dataRegionId) { MetricService.getInstance() .getOrCreateCounter( @@ -522,6 +544,24 @@ public class WritingMetrics implements IMetricSet { dataRegionId.toString()); } + public void removeManualFlushMemTableCounterMetrics(DataRegionId dataRegionId) { + MetricService.getInstance() + .remove( + MetricType.COUNTER, + Metric.MANUAL_FLUSH_MEMTABLE_COUNT.toString(), + Tag.REGION.toString(), + dataRegionId.toString()); + } + + public void removeMemControlFlushMemTableCounterMetrics(DataRegionId dataRegionId) { + MetricService.getInstance() + .remove( + MetricType.COUNTER, + Metric.MEM_CONTROL_FLUSH_MEMTABLE_COUNT.toString(), + Tag.REGION.toString(), + dataRegionId.toString()); + } + public void removeActiveMemtableCounterMetrics(DataRegionId dataRegionId) { MetricService.getInstance() .remove( @@ -793,6 +833,26 @@ public class WritingMetrics implements IMetricSet { dataRegionId); } + public void recordManualFlushMemTableCount(String dataRegionId, int number) { + MetricService.getInstance() + .count( + number, + Metric.MANUAL_FLUSH_MEMTABLE_COUNT.toString(), + MetricLevel.IMPORTANT, + Tag.REGION.toString(), + dataRegionId); + } + + public void recordMemControlFlushMemTableCount(String dataRegionId, int number) { + MetricService.getInstance() + .count( + number, + Metric.MEM_CONTROL_FLUSH_MEMTABLE_COUNT.toString(), + MetricLevel.IMPORTANT, + Tag.REGION.toString(), + dataRegionId); + } + public void recordActiveMemTableCount(String dataRegionId, int number) { MetricService.getInstance() .count( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index e30069f70f0..6d1f34c8999 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -416,6 +416,8 @@ public class StorageEngine implements IService { WRITING_METRICS.createSeriesFullFlushMemTableCounterMetrics(dataRegionId); WRITING_METRICS.createWalFlushMemTableCounterMetrics(dataRegionId); WRITING_METRICS.createTimedFlushMemTableCounterMetrics(dataRegionId); + WRITING_METRICS.createManualFlushMemTableCounterMetrics(dataRegionId); + WRITING_METRICS.createMemControlFlushMemTableCounterMetrics(dataRegionId); WRITING_METRICS.createActiveMemtableCounterMetrics(dataRegionId); dataRegion.setCustomFlushListeners(customFlushListeners); dataRegion.setCustomCloseFileListeners(customCloseFileListeners); @@ -709,6 +711,8 @@ public class StorageEngine implements IService { WRITING_METRICS.removeWalFlushMemTableCounterMetrics(regionId); WRITING_METRICS.removeTimedFlushMemTableCounterMetrics(regionId); WRITING_METRICS.removeSeriesFullFlushMemTableCounterMetrics(regionId); + WRITING_METRICS.removeManualFlushMemTableCounterMetrics(regionId); + WRITING_METRICS.removeMemControlFlushMemTableCounterMetrics(regionId); try { region.abortCompaction(); region.syncDeleteDataFiles(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 8c473652c93..15a70b10980 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -914,6 +914,7 @@ public class DataRegion implements IDataRegionForQuery { // check memtable size and may asyncTryToFlush the work memtable if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) { fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + WritingMetrics.getInstance().recordMemControlFlushMemTableCount(dataRegionId, 1); } } finally { writeUnlock(); @@ -1089,6 +1090,7 @@ public class DataRegion implements IDataRegionForQuery { // check memtable size and may async try to flush the work memtable if (tsFileProcessor.shouldFlush()) { fileFlushPolicy.apply(this, tsFileProcessor, sequence); + WritingMetrics.getInstance().recordMemControlFlushMemTableCount(dataRegionId, 1); } return true; } @@ -1206,6 +1208,7 @@ public class DataRegion implements IDataRegionForQuery { }); } + int count = 0; List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>(); for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : tsFileProcessorMap.entrySet()) { TsFileProcessor tsFileProcessor = entry.getKey(); @@ -1224,8 +1227,10 @@ public class DataRegion implements IDataRegionForQuery { // check memtable size and may asyncTryToFlush the work memtable if (entry.getKey().shouldFlush()) { fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + count++; } } + WritingMetrics.getInstance().recordMemControlFlushMemTableCount(dataRegionId, count); PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); @@ -1323,6 +1328,7 @@ public class DataRegion implements IDataRegionForQuery { // check memtable size and may asyncTryToFlush the work memtable if (tsFileProcessor.shouldFlush()) { fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + WritingMetrics.getInstance().recordMemControlFlushMemTableCount(dataRegionId, 1); } } finally { writeUnlock(); @@ -1490,6 +1496,7 @@ public class DataRegion implements IDataRegionForQuery { e); } } + WritingMetrics.getInstance().recordManualFlushMemTableCount(dataRegionId, 1); } /** @@ -1731,21 +1738,25 @@ public class DataRegion implements IDataRegionForQuery { List<Future<?>> asyncCloseAllWorkingTsFileProcessors() { writeLock("asyncCloseAllWorkingTsFileProcessors"); List<Future<?>> futures = new ArrayList<>(); + int count = 0; try { logger.info("async force close all files in database: {}", databaseName + "-" + dataRegionId); // to avoid concurrent modification problem, we need a new array list for (TsFileProcessor tsFileProcessor : new ArrayList<>(workSequenceTsFileProcessors.values())) { futures.add(asyncCloseOneTsFileProcessor(true, tsFileProcessor)); + count++; } // to avoid concurrent modification problem, we need a new array list for (TsFileProcessor tsFileProcessor : new ArrayList<>(workUnsequenceTsFileProcessors.values())) { futures.add(asyncCloseOneTsFileProcessor(false, tsFileProcessor)); + count++; } } finally { writeUnlock(); } + WritingMetrics.getInstance().recordManualFlushMemTableCount(dataRegionId, count); return futures; } @@ -3286,6 +3297,7 @@ public class DataRegion implements IDataRegionForQuery { }); } List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>(); + int count = 0; for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : tsFileProcessorMap.entrySet()) { TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); @@ -3303,8 +3315,10 @@ public class DataRegion implements IDataRegionForQuery { // check memtable size and may asyncTryToFlush the work memtable if (tsFileProcessor.shouldFlush()) { fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + count++; } } + WritingMetrics.getInstance().recordMemControlFlushMemTableCount(dataRegionId, count); PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 7966617104e..8ba2b7258d9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -80,6 +80,8 @@ public enum Metric { REJECT_THRESHOLD("reject_threshold"), TIMED_FLUSH_MEMTABLE_COUNT("timed_flush_memtable_count"), WAL_FLUSH_MEMTABLE_COUNT("wal_flush_memtable_count"), + MANUAL_FLUSH_MEMTABLE_COUNT("manual_flush_memtable_count"), + MEM_CONTROL_FLUSH_MEMTABLE_COUNT("mem_control_flush_memtable_count"), SERIES_FULL_FLUSH_MEMTABLE("series_full_flush_memtable"), ACTIVE_MEMTABLE_COUNT("active_memtable_count"), ACTIVE_TIME_PARTITION_COUNT("active_time_partition_count"),
