This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c5b8a59c477 Active Load: Add metrics for file size and refactor the 
code (#13329)
c5b8a59c477 is described below

commit c5b8a59c477fe0bf39a3a8359752f295cdb449c9
Author: YC27 <[email protected]>
AuthorDate: Fri Aug 30 18:19:35 2024 +0800

    Active Load: Add metrics for file size and refactor the code (#13329)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../db/service/metrics/DataNodeMetricsHelper.java  |   6 +-
 .../load/active/ActiveLoadDirScanner.java          |  39 ++++-
 .../load/active/ActiveLoadPendingQueue.java        |  10 +-
 .../load/active/ActiveLoadTsFileLoader.java        |  20 ++-
 .../load/metrics/ActiveLoadingFilesMetricsSet.java | 175 +++++++++++++--------
 ...ava => ActiveLoadingFilesNumberMetricsSet.java} |  84 +++++-----
 .../metrics/ActiveLoadingFilesSizeMetricsSet.java  |  69 ++++++++
 .../iotdb/commons/service/metric/enums/Metric.java |   3 +-
 8 files changed, 281 insertions(+), 125 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index d9d86001b9a..17463aa357f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -39,7 +39,8 @@ import 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
-import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
 import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
 import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileMemMetricSet;
 import org.apache.iotdb.db.subscription.metric.SubscriptionMetrics;
@@ -98,7 +99,8 @@ public class DataNodeMetricsHelper {
 
     // bind load related metrics
     
MetricService.getInstance().addMetricSet(LoadTsFileCostMetricsSet.getInstance());
-    
MetricService.getInstance().addMetricSet(ActiveLoadingFilesMetricsSet.getInstance());
+    
MetricService.getInstance().addMetricSet(ActiveLoadingFilesNumberMetricsSet.getInstance());
+    
MetricService.getInstance().addMetricSet(ActiveLoadingFilesSizeMetricsSet.getInstance());
   }
 
   private static void initSystemMetrics() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index 22375261e13..259ee7079c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -20,7 +20,8 @@
 package org.apache.iotdb.db.storageengine.load.active;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
-import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.tsfile.common.conf.TSFileConfig;
@@ -113,13 +114,15 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
           }
         }
       }
-
       // Hot reload active load listening dir for pipe data sync
       // Active load is always enabled for pipe data sync
       listeningDirs.add(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
 
       // Create directories if not exists
       listeningDirs.forEach(this::createDirectoriesIfNotExists);
+
+      
ActiveLoadingFilesNumberMetricsSet.getInstance().updatePendingDirList(listeningDirs);
+      
ActiveLoadingFilesSizeMetricsSet.getInstance().updatePendingDirList(listeningDirs);
     } catch (final Exception e) {
       LOGGER.warn(
           "Error occurred during hot reload active load dirs. "
@@ -147,23 +150,45 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
 
   // Metrics
   public long countAndReportActiveListeningDirsFileNumber() {
-    final long[] fileCount = {0};
+    long totalFileCount = 0;
+    long totalFileSize = 0;
+
     try {
-      for (String dir : listeningDirs) {
+      for (final String dir : listeningDirs) {
+        final long[] fileCountInDir = {0};
+        final long[] fileSizeInDir = {0};
+
         Files.walkFileTree(
             new File(dir).toPath(),
             new SimpleFileVisitor<Path>() {
               @Override
               public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) {
-                fileCount[0]++;
+                fileCountInDir[0]++;
+                try {
+                  fileSizeInDir[0] += file.toFile().length();
+                } catch (Exception e) {
+                  LOGGER.debug("Failed to count active listening dirs file 
number.", e);
+                }
                 return FileVisitResult.CONTINUE;
               }
             });
+
+        ActiveLoadingFilesNumberMetricsSet.getInstance()
+            .updatePendingFileCounterInDir(dir, fileCountInDir[0]);
+        ActiveLoadingFilesSizeMetricsSet.getInstance()
+            .updatePendingFileCounterInDir(dir, fileSizeInDir[0]);
+
+        totalFileCount += fileCountInDir[0];
+        totalFileSize += fileSizeInDir[0];
       }
-      
ActiveLoadingFilesMetricsSet.getInstance().recordPendingFileCounter(fileCount[0]);
+
+      ActiveLoadingFilesNumberMetricsSet.getInstance()
+          .updateTotalPendingFileCounter(totalFileCount);
+      
ActiveLoadingFilesSizeMetricsSet.getInstance().updateTotalPendingFileCounter(totalFileSize);
     } catch (final IOException e) {
       LOGGER.debug("Failed to count active listening dirs file number.", e);
     }
-    return fileCount[0];
+
+    return totalFileCount;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index f04d846c6ca..6c2b2cd41f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.load.active;
 
-import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
 
 import org.apache.tsfile.utils.Pair;
 
@@ -39,7 +39,7 @@ public class ActiveLoadPendingQueue {
     if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
       pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));
 
-      ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(1);
+      
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1);
       return true;
     }
     return false;
@@ -51,8 +51,8 @@ public class ActiveLoadPendingQueue {
       pendingFileSet.remove(pair.left);
       loadingFileSet.add(pair.left);
 
-      ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(1);
-      ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(-1);
+      
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(1);
+      
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(-1);
     }
     return pair;
   }
@@ -60,7 +60,7 @@ public class ActiveLoadPendingQueue {
   public synchronized void removeFromLoading(final String file) {
     loadingFileSet.remove(file);
 
-    ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(-1);
+    
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(-1);
   }
 
   public synchronized boolean isFilePendingOrLoading(final String file) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index e237dc2077f..61678779772 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -34,7 +34,8 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
-import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
+import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.io.FileUtils;
@@ -96,6 +97,9 @@ public class ActiveLoadTsFileLoader {
                 e);
           }
           failDir.set(IOTDB_CONFIG.getLoadActiveListeningFailDir());
+
+          
ActiveLoadingFilesSizeMetricsSet.getInstance().updateFailedDir(failDir.get());
+          
ActiveLoadingFilesNumberMetricsSet.getInstance().updateFailedDir(failDir.get());
         }
       }
     }
@@ -262,6 +266,8 @@ public class ActiveLoadTsFileLoader {
   // Metrics
   public long countAndReportFailedFileNumber() {
     final long[] fileCount = {0};
+    final long[] fileSize = {0};
+
     try {
       initFailDirIfNecessary();
       Files.walkFileTree(
@@ -270,13 +276,21 @@ public class ActiveLoadTsFileLoader {
             @Override
             public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) {
               fileCount[0]++;
+              try {
+                fileSize[0] += file.toFile().length();
+              } catch (Exception e) {
+                LOGGER.debug("Failed to count failed files in fail 
directory.", e);
+              }
               return FileVisitResult.CONTINUE;
             }
           });
-      
ActiveLoadingFilesMetricsSet.getInstance().recordFailedFileCounter(fileCount[0]);
+
+      
ActiveLoadingFilesNumberMetricsSet.getInstance().updateTotalFailedFileCounter(fileCount[0]);
+      
ActiveLoadingFilesSizeMetricsSet.getInstance().updateTotalFailedFileCounter(fileSize[0]);
     } catch (final IOException e) {
-      LOGGER.warn("Failed to count failed files in fail directory.", e);
+      LOGGER.debug("Failed to count failed files in fail directory.", e);
     }
+
     return fileCount[0];
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
index aae166ef5d7..f93e8fc9299 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.load.metrics;
 
-import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
@@ -28,86 +27,138 @@ import org.apache.iotdb.metrics.type.Counter;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
-public class ActiveLoadingFilesMetricsSet implements IMetricSet {
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-  private static final ActiveLoadingFilesMetricsSet INSTANCE = new 
ActiveLoadingFilesMetricsSet();
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReference;
 
-  public static final String PENDING = "pending";
-  public static final String QUEUING = "queuing";
-  public static final String LOADING = "loading";
-  public static final String FAILED = "failed";
+public abstract class ActiveLoadingFilesMetricsSet implements IMetricSet {
 
-  private ActiveLoadingFilesMetricsSet() {
-    // empty construct
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadingFilesMetricsSet.class);
+
+  protected static final String FAILED_PREFIX = "failed - ";
+  protected static final String PENDING_PREFIX = "pending - ";
+
+  protected AtomicReference<AbstractMetricService> metricService = new 
AtomicReference<>();
+
+  private final AtomicReference<String> failedDir = new AtomicReference<>();
+  private final Set<String> pendingDirs = new CopyOnWriteArraySet<>();
+
+  protected Counter totalFailedFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
+  protected Counter totalPendingFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
+  protected Map<String, Counter> dir2PendingFileCounterMap = new 
ConcurrentHashMap<>();
+
+  public void updateTotalFailedFileCounter(final long number) {
+    totalFailedFileCounter.inc(number - totalFailedFileCounter.getCount());
+  }
+
+  public void updateTotalPendingFileCounter(final long number) {
+    totalPendingFileCounter.inc(number - totalPendingFileCounter.getCount());
+  }
+
+  public void updatePendingFileCounterInDir(final String dirName, final long 
number) {
+    final Counter counter = dir2PendingFileCounterMap.get(dirName);
+    if (counter == null) {
+      LOGGER.debug("Failed to update file counter, dir({}) does not exist", 
dirName);
+      return;
+    }
+    counter.inc(number - counter.getCount());
+  }
+
+  public void updatePendingDirList(final Set<String> givenListeningDirs) {
+    if (metricService.get() == null || Objects.equals(pendingDirs, 
givenListeningDirs)) {
+      return;
+    }
+
+    pendingDirs.clear();
+    pendingDirs.addAll(givenListeningDirs);
+
+    unbindDir2PendingFileCounters(metricService.get());
+    rebindDir2PendingFileCounters();
   }
 
-  private Counter pendingFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
-  private Counter queuingFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
-  private Counter loadingFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
-  private Counter failedFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
+  protected void unbindDir2PendingFileCounters(final AbstractMetricService 
metricService) {
+    dir2PendingFileCounterMap
+        .keySet()
+        .forEach(
+            dir ->
+                metricService.remove(
+                    MetricType.COUNTER,
+                    getMetricName(),
+                    Tag.TYPE.toString(),
+                    PENDING_PREFIX + dir));
+    dir2PendingFileCounterMap.clear();
+  }
 
-  public void recordPendingFileCounter(final long number) {
-    pendingFileCounter.inc(number - pendingFileCounter.getCount());
+  private void rebindDir2PendingFileCounters() {
+    dir2PendingFileCounterMap.clear();
+    if (!pendingDirs.isEmpty()) {
+      for (String dir : pendingDirs) {
+        dir2PendingFileCounterMap.put(
+            dir,
+            metricService
+                .get()
+                .getOrCreateCounter(
+                    getMetricName(),
+                    MetricLevel.IMPORTANT,
+                    Tag.TYPE.toString(),
+                    PENDING_PREFIX + dir));
+      }
+    }
   }
 
-  public void recordQueuingFileCounter(final long number) {
-    queuingFileCounter.inc(number);
+  public void updateFailedDir(final String dirName) {
+    if (metricService.get() == null || Objects.equals(failedDir.get(), 
dirName)) {
+      return;
+    }
+
+    failedDir.set(dirName);
+
+    unbindFailedDirCounter(metricService.get());
+    rebindFailedDirCounter();
   }
 
-  public void recordLoadingFileCounter(final long number) {
-    loadingFileCounter.inc(number);
+  protected void unbindFailedDirCounter(final AbstractMetricService 
metricService) {
+    totalFailedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+    metricService.remove(
+        MetricType.COUNTER, getMetricName(), Tag.TYPE.toString(), 
FAILED_PREFIX + failedDir.get());
   }
 
-  public void recordFailedFileCounter(final long number) {
-    failedFileCounter.inc(number - failedFileCounter.getCount());
+  private void rebindFailedDirCounter() {
+    totalFailedFileCounter =
+        metricService
+            .get()
+            .getOrCreateCounter(
+                getMetricName(),
+                MetricLevel.IMPORTANT,
+                Tag.TYPE.toString(),
+                FAILED_PREFIX + failedDir.get());
   }
 
   @Override
   public void bindTo(final AbstractMetricService metricService) {
-    pendingFileCounter =
-        metricService.getOrCreateCounter(
-            Metric.ACTIVE_LOADING_FILES.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.TYPE.toString(),
-            PENDING);
-    queuingFileCounter =
-        metricService.getOrCreateCounter(
-            Metric.ACTIVE_LOADING_FILES.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.TYPE.toString(),
-            QUEUING);
-    loadingFileCounter =
-        metricService.getOrCreateCounter(
-            Metric.ACTIVE_LOADING_FILES.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.TYPE.toString(),
-            LOADING);
-    failedFileCounter =
-        metricService.getOrCreateCounter(
-            Metric.ACTIVE_LOADING_FILES.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.TYPE.toString(),
-            FAILED);
+    this.metricService.set(metricService);
+
+    // Dir2PendingFileCounters' binding is triggered by updatePendingDirList
+    // FailedDirCounter's binding is triggered by updateFailedDir
+    bindOtherCounters(metricService);
   }
 
+  protected abstract void bindOtherCounters(final AbstractMetricService 
metricService);
+
   @Override
   public void unbindFrom(final AbstractMetricService metricService) {
-    pendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
-    queuingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
-    loadingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
-    failedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
-
-    metricService.remove(
-        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), PENDING);
-    metricService.remove(
-        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), QUEUING);
-    metricService.remove(
-        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), LOADING);
-    metricService.remove(
-        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), FAILED);
+    unbindDir2PendingFileCounters(metricService);
+    unbindFailedDirCounter(metricService);
+    unbindOtherCounters(metricService);
   }
 
-  public static ActiveLoadingFilesMetricsSet getInstance() {
-    return ActiveLoadingFilesMetricsSet.INSTANCE;
-  }
+  protected abstract void unbindOtherCounters(final AbstractMetricService 
metricService);
+
+  protected abstract String getMetricName();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesNumberMetricsSet.java
similarity index 50%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesNumberMetricsSet.java
index aae166ef5d7..11105ed14c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesNumberMetricsSet.java
@@ -23,91 +23,85 @@ import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
-import org.apache.iotdb.metrics.metricsets.IMetricSet;
 import org.apache.iotdb.metrics.type.Counter;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
-public class ActiveLoadingFilesMetricsSet implements IMetricSet {
+public class ActiveLoadingFilesNumberMetricsSet extends 
ActiveLoadingFilesMetricsSet {
 
-  private static final ActiveLoadingFilesMetricsSet INSTANCE = new 
ActiveLoadingFilesMetricsSet();
+  private static final String PENDING = "pending";
+  private static final String QUEUING = "queuing";
+  private static final String LOADING = "loading";
 
-  public static final String PENDING = "pending";
-  public static final String QUEUING = "queuing";
-  public static final String LOADING = "loading";
-  public static final String FAILED = "failed";
-
-  private ActiveLoadingFilesMetricsSet() {
-    // empty construct
-  }
-
-  private Counter pendingFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
   private Counter queuingFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
   private Counter loadingFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
-  private Counter failedFileCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
-
-  public void recordPendingFileCounter(final long number) {
-    pendingFileCounter.inc(number - pendingFileCounter.getCount());
-  }
 
-  public void recordQueuingFileCounter(final long number) {
+  public void increaseQueuingFileCounter(final long number) {
     queuingFileCounter.inc(number);
   }
 
-  public void recordLoadingFileCounter(final long number) {
+  public void increaseLoadingFileCounter(final long number) {
     loadingFileCounter.inc(number);
   }
 
-  public void recordFailedFileCounter(final long number) {
-    failedFileCounter.inc(number - failedFileCounter.getCount());
-  }
-
   @Override
-  public void bindTo(final AbstractMetricService metricService) {
-    pendingFileCounter =
+  protected void bindOtherCounters(final AbstractMetricService metricService) {
+    totalPendingFileCounter =
         metricService.getOrCreateCounter(
-            Metric.ACTIVE_LOADING_FILES.toString(),
+            Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
             MetricLevel.IMPORTANT,
             Tag.TYPE.toString(),
             PENDING);
     queuingFileCounter =
         metricService.getOrCreateCounter(
-            Metric.ACTIVE_LOADING_FILES.toString(),
+            Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
             MetricLevel.IMPORTANT,
             Tag.TYPE.toString(),
             QUEUING);
     loadingFileCounter =
         metricService.getOrCreateCounter(
-            Metric.ACTIVE_LOADING_FILES.toString(),
+            Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
             MetricLevel.IMPORTANT,
             Tag.TYPE.toString(),
             LOADING);
-    failedFileCounter =
-        metricService.getOrCreateCounter(
-            Metric.ACTIVE_LOADING_FILES.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.TYPE.toString(),
-            FAILED);
   }
 
   @Override
-  public void unbindFrom(final AbstractMetricService metricService) {
-    pendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+  protected void unbindOtherCounters(final AbstractMetricService 
metricService) {
+    totalPendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
     queuingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
     loadingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
-    failedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
 
     metricService.remove(
-        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), PENDING);
-    metricService.remove(
-        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), QUEUING);
+        MetricType.COUNTER,
+        Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
+        Tag.TYPE.toString(),
+        PENDING);
     metricService.remove(
-        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), LOADING);
+        MetricType.COUNTER,
+        Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
+        Tag.TYPE.toString(),
+        QUEUING);
     metricService.remove(
-        MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), 
Tag.TYPE.toString(), FAILED);
+        MetricType.COUNTER,
+        Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
+        Tag.TYPE.toString(),
+        LOADING);
+  }
+
+  @Override
+  protected String getMetricName() {
+    return Metric.ACTIVE_LOADING_FILES_NUMBER.toString();
   }
 
-  public static ActiveLoadingFilesMetricsSet getInstance() {
-    return ActiveLoadingFilesMetricsSet.INSTANCE;
+  public static ActiveLoadingFilesNumberMetricsSet getInstance() {
+    return ActiveLoadingFilesNumberMetricsSet.INSTANCE;
+  }
+
+  private static final ActiveLoadingFilesNumberMetricsSet INSTANCE =
+      new ActiveLoadingFilesNumberMetricsSet();
+
+  private ActiveLoadingFilesNumberMetricsSet() {
+    // empty construct
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesSizeMetricsSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesSizeMetricsSet.java
new file mode 100644
index 00000000000..554918d981c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesSizeMetricsSet.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.metrics;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class ActiveLoadingFilesSizeMetricsSet extends 
ActiveLoadingFilesMetricsSet {
+
+  private static final String PENDING_SIZE = "pending (total)";
+
+  @Override
+  protected void bindOtherCounters(AbstractMetricService metricService) {
+    totalPendingFileCounter =
+        metricService.getOrCreateCounter(
+            Metric.ACTIVE_LOADING_FILES_SIZE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.TYPE.toString(),
+            PENDING_SIZE);
+  }
+
+  @Override
+  protected void unbindOtherCounters(AbstractMetricService metricService) {
+    totalPendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.ACTIVE_LOADING_FILES_SIZE.toString(),
+        Tag.TYPE.toString(),
+        PENDING_SIZE);
+  }
+
+  @Override
+  protected String getMetricName() {
+    return Metric.ACTIVE_LOADING_FILES_SIZE.toString();
+  }
+
+  public static ActiveLoadingFilesSizeMetricsSet getInstance() {
+    return INSTANCE;
+  }
+
+  private static final ActiveLoadingFilesSizeMetricsSet INSTANCE =
+      new ActiveLoadingFilesSizeMetricsSet();
+
+  private ActiveLoadingFilesSizeMetricsSet() {
+    // empty construct
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 707f46fa259..7c2f4ac1016 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -172,7 +172,8 @@ public enum Metric {
   SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),
   SUBSCRIPTION_EVENT_TRANSFER("subscription_event_transfer"),
   // load related
-  ACTIVE_LOADING_FILES("active_loading_files"),
+  ACTIVE_LOADING_FILES_NUMBER("active_loading_files_number"),
+  ACTIVE_LOADING_FILES_SIZE("active_loading_files_size"),
   LOAD_MEM("load_mem"),
   LOAD_DISK_IO("load_disk_io"),
   LOAD_TIME_COST("load_time_cost"),

Reply via email to