This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch add_flush_metric
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 76e2b12cacb27d8f03fbb254677e6e0808d9bd7b
Author: HTHou <[email protected]>
AuthorDate: Tue Jun 25 12:04:40 2024 +0800

    Add flush metric
---
 .../iotdb/db/service/metrics/WritingMetrics.java   | 60 ++++++++++++++++++++++
 .../iotdb/db/storageengine/StorageEngine.java      |  4 ++
 .../db/storageengine/dataregion/DataRegion.java    | 14 +++++
 .../iotdb/commons/service/metric/enums/Metric.java |  2 +
 4 files changed, 80 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index efe2dacda2d..bf6ce7d0bd3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -345,6 +345,8 @@ public class WritingMetrics implements IMetricSet {
     allDataRegions.forEach(this::createDataRegionMemoryCostMetrics);
     allDataRegionIds.forEach(this::createFlushingMemTableStatusMetrics);
     
allDataRegionIds.forEach(this::createSeriesFullFlushMemTableCounterMetrics);
+    allDataRegionIds.forEach(this::createManualFlushMemTableCounterMetrics);
+    
allDataRegionIds.forEach(this::createMemControlFlushMemTableCounterMetrics);
     allDataRegionIds.forEach(this::createTimedFlushMemTableCounterMetrics);
     allDataRegionIds.forEach(this::createWalFlushMemTableCounterMetrics);
     allDataRegionIds.forEach(this::createActiveMemtableCounterMetrics);
@@ -371,6 +373,8 @@ public class WritingMetrics implements IMetricSet {
           removeSeriesFullFlushMemTableCounterMetrics(dataRegionId);
           removeTimedFlushMemTableCounterMetrics(dataRegionId);
           removeWalFlushMemTableCounterMetrics(dataRegionId);
+          removeManualFlushMemTableCounterMetrics(dataRegionId);
+          removeMemControlFlushMemTableCounterMetrics(dataRegionId);
           removeActiveMemtableCounterMetrics(dataRegionId);
         });
     removeActiveTimePartitionCounterMetrics();
@@ -481,6 +485,24 @@ public class WritingMetrics implements IMetricSet {
             dataRegionId.toString());
   }
 
+  public void createManualFlushMemTableCounterMetrics(DataRegionId 
dataRegionId) {
+    MetricService.getInstance()
+        .getOrCreateCounter(
+            Metric.MANUAL_FLUSH_MEMTABLE_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.REGION.toString(),
+            dataRegionId.toString());
+  }
+
+  public void createMemControlFlushMemTableCounterMetrics(DataRegionId 
dataRegionId) {
+    MetricService.getInstance()
+        .getOrCreateCounter(
+            Metric.MEM_CONTROL_FLUSH_MEMTABLE_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.REGION.toString(),
+            dataRegionId.toString());
+  }
+
   public void createActiveMemtableCounterMetrics(DataRegionId dataRegionId) {
     MetricService.getInstance()
         .getOrCreateCounter(
@@ -522,6 +544,24 @@ public class WritingMetrics implements IMetricSet {
             dataRegionId.toString());
   }
 
+  public void removeManualFlushMemTableCounterMetrics(DataRegionId 
dataRegionId) {
+    MetricService.getInstance()
+        .remove(
+            MetricType.COUNTER,
+            Metric.MANUAL_FLUSH_MEMTABLE_COUNT.toString(),
+            Tag.REGION.toString(),
+            dataRegionId.toString());
+  }
+
+  public void removeMemControlFlushMemTableCounterMetrics(DataRegionId 
dataRegionId) {
+    MetricService.getInstance()
+        .remove(
+            MetricType.COUNTER,
+            Metric.MEM_CONTROL_FLUSH_MEMTABLE_COUNT.toString(),
+            Tag.REGION.toString(),
+            dataRegionId.toString());
+  }
+
   public void removeActiveMemtableCounterMetrics(DataRegionId dataRegionId) {
     MetricService.getInstance()
         .remove(
@@ -793,6 +833,26 @@ public class WritingMetrics implements IMetricSet {
             dataRegionId);
   }
 
+  public void recordManualFlushMemTableCount(String dataRegionId, int number) {
+    MetricService.getInstance()
+        .count(
+            number,
+            Metric.MANUAL_FLUSH_MEMTABLE_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.REGION.toString(),
+            dataRegionId);
+  }
+
+  public void recordMemControlFlushMemTableCount(String dataRegionId, int 
number) {
+    MetricService.getInstance()
+        .count(
+            number,
+            Metric.MEM_CONTROL_FLUSH_MEMTABLE_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.REGION.toString(),
+            dataRegionId);
+  }
+
   public void recordActiveMemTableCount(String dataRegionId, int number) {
     MetricService.getInstance()
         .count(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index e30069f70f0..6d1f34c8999 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -416,6 +416,8 @@ public class StorageEngine implements IService {
     WRITING_METRICS.createSeriesFullFlushMemTableCounterMetrics(dataRegionId);
     WRITING_METRICS.createWalFlushMemTableCounterMetrics(dataRegionId);
     WRITING_METRICS.createTimedFlushMemTableCounterMetrics(dataRegionId);
+    WRITING_METRICS.createManualFlushMemTableCounterMetrics(dataRegionId);
+    WRITING_METRICS.createMemControlFlushMemTableCounterMetrics(dataRegionId);
     WRITING_METRICS.createActiveMemtableCounterMetrics(dataRegionId);
     dataRegion.setCustomFlushListeners(customFlushListeners);
     dataRegion.setCustomCloseFileListeners(customCloseFileListeners);
@@ -709,6 +711,8 @@ public class StorageEngine implements IService {
       WRITING_METRICS.removeWalFlushMemTableCounterMetrics(regionId);
       WRITING_METRICS.removeTimedFlushMemTableCounterMetrics(regionId);
       WRITING_METRICS.removeSeriesFullFlushMemTableCounterMetrics(regionId);
+      WRITING_METRICS.removeManualFlushMemTableCounterMetrics(regionId);
+      WRITING_METRICS.removeMemControlFlushMemTableCounterMetrics(regionId);
       try {
         region.abortCompaction();
         region.syncDeleteDataFiles();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 8c473652c93..15a70b10980 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -914,6 +914,7 @@ public class DataRegion implements IDataRegionForQuery {
       // check memtable size and may asyncTryToFlush the work memtable
       if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
         fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+        
WritingMetrics.getInstance().recordMemControlFlushMemTableCount(dataRegionId, 
1);
       }
     } finally {
       writeUnlock();
@@ -1089,6 +1090,7 @@ public class DataRegion implements IDataRegionForQuery {
     // check memtable size and may async try to flush the work memtable
     if (tsFileProcessor.shouldFlush()) {
       fileFlushPolicy.apply(this, tsFileProcessor, sequence);
+      
WritingMetrics.getInstance().recordMemControlFlushMemTableCount(dataRegionId, 
1);
     }
     return true;
   }
@@ -1206,6 +1208,7 @@ public class DataRegion implements IDataRegionForQuery {
           });
     }
 
+    int count = 0;
     List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
     for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
       TsFileProcessor tsFileProcessor = entry.getKey();
@@ -1224,8 +1227,10 @@ public class DataRegion implements IDataRegionForQuery {
       // check memtable size and may asyncTryToFlush the work memtable
       if (entry.getKey().shouldFlush()) {
         fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+        count++;
       }
     }
+    
WritingMetrics.getInstance().recordMemControlFlushMemTableCount(dataRegionId, 
count);
 
     
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
     
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
@@ -1323,6 +1328,7 @@ public class DataRegion implements IDataRegionForQuery {
       // check memtable size and may asyncTryToFlush the work memtable
       if (tsFileProcessor.shouldFlush()) {
         fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+        
WritingMetrics.getInstance().recordMemControlFlushMemTableCount(dataRegionId, 
1);
       }
     } finally {
       writeUnlock();
@@ -1490,6 +1496,7 @@ public class DataRegion implements IDataRegionForQuery {
             e);
       }
     }
+    WritingMetrics.getInstance().recordManualFlushMemTableCount(dataRegionId, 
1);
   }
 
   /**
@@ -1731,21 +1738,25 @@ public class DataRegion implements IDataRegionForQuery {
   List<Future<?>> asyncCloseAllWorkingTsFileProcessors() {
     writeLock("asyncCloseAllWorkingTsFileProcessors");
     List<Future<?>> futures = new ArrayList<>();
+    int count = 0;
     try {
       logger.info("async force close all files in database: {}", databaseName 
+ "-" + dataRegionId);
       // to avoid concurrent modification problem, we need a new array list
       for (TsFileProcessor tsFileProcessor :
           new ArrayList<>(workSequenceTsFileProcessors.values())) {
         futures.add(asyncCloseOneTsFileProcessor(true, tsFileProcessor));
+        count++;
       }
       // to avoid concurrent modification problem, we need a new array list
       for (TsFileProcessor tsFileProcessor :
           new ArrayList<>(workUnsequenceTsFileProcessors.values())) {
         futures.add(asyncCloseOneTsFileProcessor(false, tsFileProcessor));
+        count++;
       }
     } finally {
       writeUnlock();
     }
+    WritingMetrics.getInstance().recordManualFlushMemTableCount(dataRegionId, 
count);
     return futures;
   }
 
@@ -3286,6 +3297,7 @@ public class DataRegion implements IDataRegionForQuery {
             });
       }
       List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
+      int count = 0;
       for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
         TsFileProcessor tsFileProcessor = entry.getKey();
         InsertRowsNode subInsertRowsNode = entry.getValue();
@@ -3303,8 +3315,10 @@ public class DataRegion implements IDataRegionForQuery {
         // check memtable size and may asyncTryToFlush the work memtable
         if (tsFileProcessor.shouldFlush()) {
           fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+          count++;
         }
       }
+      
WritingMetrics.getInstance().recordMemControlFlushMemTableCount(dataRegionId, 
count);
 
       
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
       
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 7966617104e..8ba2b7258d9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -80,6 +80,8 @@ public enum Metric {
   REJECT_THRESHOLD("reject_threshold"),
   TIMED_FLUSH_MEMTABLE_COUNT("timed_flush_memtable_count"),
   WAL_FLUSH_MEMTABLE_COUNT("wal_flush_memtable_count"),
+  MANUAL_FLUSH_MEMTABLE_COUNT("manual_flush_memtable_count"),
+  MEM_CONTROL_FLUSH_MEMTABLE_COUNT("mem_control_flush_memtable_count"),
   SERIES_FULL_FLUSH_MEMTABLE("series_full_flush_memtable"),
   ACTIVE_MEMTABLE_COUNT("active_memtable_count"),
   ACTIVE_TIME_PARTITION_COUNT("active_time_partition_count"),

Reply via email to