This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c5b8a59c477 Active Load: Add metrics for file size and refactor the
code (#13329)
c5b8a59c477 is described below
commit c5b8a59c477fe0bf39a3a8359752f295cdb449c9
Author: YC27 <[email protected]>
AuthorDate: Fri Aug 30 18:19:35 2024 +0800
Active Load: Add metrics for file size and refactor the code (#13329)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../db/service/metrics/DataNodeMetricsHelper.java | 6 +-
.../load/active/ActiveLoadDirScanner.java | 39 ++++-
.../load/active/ActiveLoadPendingQueue.java | 10 +-
.../load/active/ActiveLoadTsFileLoader.java | 20 ++-
.../load/metrics/ActiveLoadingFilesMetricsSet.java | 175 +++++++++++++--------
...ava => ActiveLoadingFilesNumberMetricsSet.java} | 84 +++++-----
.../metrics/ActiveLoadingFilesSizeMetricsSet.java | 69 ++++++++
.../iotdb/commons/service/metric/enums/Metric.java | 3 +-
8 files changed, 281 insertions(+), 125 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index d9d86001b9a..17463aa357f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -39,7 +39,8 @@ import
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
-import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileMemMetricSet;
import org.apache.iotdb.db.subscription.metric.SubscriptionMetrics;
@@ -98,7 +99,8 @@ public class DataNodeMetricsHelper {
// bind load related metrics
MetricService.getInstance().addMetricSet(LoadTsFileCostMetricsSet.getInstance());
-
MetricService.getInstance().addMetricSet(ActiveLoadingFilesMetricsSet.getInstance());
+
MetricService.getInstance().addMetricSet(ActiveLoadingFilesNumberMetricsSet.getInstance());
+
MetricService.getInstance().addMetricSet(ActiveLoadingFilesSizeMetricsSet.getInstance());
}
private static void initSystemMetrics() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index 22375261e13..259ee7079c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -20,7 +20,8 @@
package org.apache.iotdb.db.storageengine.load.active;
import org.apache.iotdb.commons.concurrent.ThreadName;
-import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
import org.apache.commons.io.FileUtils;
import org.apache.tsfile.common.conf.TSFileConfig;
@@ -113,13 +114,15 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
}
}
}
-
// Hot reload active load listening dir for pipe data sync
// Active load is always enabled for pipe data sync
listeningDirs.add(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
// Create directories if not exists
listeningDirs.forEach(this::createDirectoriesIfNotExists);
+
+
ActiveLoadingFilesNumberMetricsSet.getInstance().updatePendingDirList(listeningDirs);
+
ActiveLoadingFilesSizeMetricsSet.getInstance().updatePendingDirList(listeningDirs);
} catch (final Exception e) {
LOGGER.warn(
"Error occurred during hot reload active load dirs. "
@@ -147,23 +150,45 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
// Metrics
public long countAndReportActiveListeningDirsFileNumber() {
- final long[] fileCount = {0};
+ long totalFileCount = 0;
+ long totalFileSize = 0;
+
try {
- for (String dir : listeningDirs) {
+ for (final String dir : listeningDirs) {
+ final long[] fileCountInDir = {0};
+ final long[] fileSizeInDir = {0};
+
Files.walkFileTree(
new File(dir).toPath(),
new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes
attrs) {
- fileCount[0]++;
+ fileCountInDir[0]++;
+ try {
+ fileSizeInDir[0] += file.toFile().length();
+ } catch (Exception e) {
+ LOGGER.debug("Failed to count active listening dirs file
number.", e);
+ }
return FileVisitResult.CONTINUE;
}
});
+
+ ActiveLoadingFilesNumberMetricsSet.getInstance()
+ .updatePendingFileCounterInDir(dir, fileCountInDir[0]);
+ ActiveLoadingFilesSizeMetricsSet.getInstance()
+ .updatePendingFileCounterInDir(dir, fileSizeInDir[0]);
+
+ totalFileCount += fileCountInDir[0];
+ totalFileSize += fileSizeInDir[0];
}
-
ActiveLoadingFilesMetricsSet.getInstance().recordPendingFileCounter(fileCount[0]);
+
+ ActiveLoadingFilesNumberMetricsSet.getInstance()
+ .updateTotalPendingFileCounter(totalFileCount);
+
ActiveLoadingFilesSizeMetricsSet.getInstance().updateTotalPendingFileCounter(totalFileSize);
} catch (final IOException e) {
LOGGER.debug("Failed to count active listening dirs file number.", e);
}
- return fileCount[0];
+
+ return totalFileCount;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index f04d846c6ca..6c2b2cd41f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.storageengine.load.active;
-import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
import org.apache.tsfile.utils.Pair;
@@ -39,7 +39,7 @@ public class ActiveLoadPendingQueue {
if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));
- ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(1);
+
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1);
return true;
}
return false;
@@ -51,8 +51,8 @@ public class ActiveLoadPendingQueue {
pendingFileSet.remove(pair.left);
loadingFileSet.add(pair.left);
- ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(1);
- ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(-1);
+
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(1);
+
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(-1);
}
return pair;
}
@@ -60,7 +60,7 @@ public class ActiveLoadPendingQueue {
public synchronized void removeFromLoading(final String file) {
loadingFileSet.remove(file);
- ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(-1);
+
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(-1);
}
public synchronized boolean isFilePendingOrLoading(final String file) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index e237dc2077f..61678779772 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -34,7 +34,8 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
-import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
+import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.io.FileUtils;
@@ -96,6 +97,9 @@ public class ActiveLoadTsFileLoader {
e);
}
failDir.set(IOTDB_CONFIG.getLoadActiveListeningFailDir());
+
+
ActiveLoadingFilesSizeMetricsSet.getInstance().updateFailedDir(failDir.get());
+
ActiveLoadingFilesNumberMetricsSet.getInstance().updateFailedDir(failDir.get());
}
}
}
@@ -262,6 +266,8 @@ public class ActiveLoadTsFileLoader {
// Metrics
public long countAndReportFailedFileNumber() {
final long[] fileCount = {0};
+ final long[] fileSize = {0};
+
try {
initFailDirIfNecessary();
Files.walkFileTree(
@@ -270,13 +276,21 @@ public class ActiveLoadTsFileLoader {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes
attrs) {
fileCount[0]++;
+ try {
+ fileSize[0] += file.toFile().length();
+ } catch (Exception e) {
+ LOGGER.debug("Failed to count failed files in fail
directory.", e);
+ }
return FileVisitResult.CONTINUE;
}
});
-
ActiveLoadingFilesMetricsSet.getInstance().recordFailedFileCounter(fileCount[0]);
+
+
ActiveLoadingFilesNumberMetricsSet.getInstance().updateTotalFailedFileCounter(fileCount[0]);
+
ActiveLoadingFilesSizeMetricsSet.getInstance().updateTotalFailedFileCounter(fileSize[0]);
} catch (final IOException e) {
- LOGGER.warn("Failed to count failed files in fail directory.", e);
+ LOGGER.debug("Failed to count failed files in fail directory.", e);
}
+
return fileCount[0];
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
index aae166ef5d7..f93e8fc9299 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.load.metrics;
-import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
@@ -28,86 +27,138 @@ import org.apache.iotdb.metrics.type.Counter;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
-public class ActiveLoadingFilesMetricsSet implements IMetricSet {
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- private static final ActiveLoadingFilesMetricsSet INSTANCE = new
ActiveLoadingFilesMetricsSet();
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReference;
- public static final String PENDING = "pending";
- public static final String QUEUING = "queuing";
- public static final String LOADING = "loading";
- public static final String FAILED = "failed";
+public abstract class ActiveLoadingFilesMetricsSet implements IMetricSet {
- private ActiveLoadingFilesMetricsSet() {
- // empty construct
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadingFilesMetricsSet.class);
+
+ protected static final String FAILED_PREFIX = "failed - ";
+ protected static final String PENDING_PREFIX = "pending - ";
+
+ protected AtomicReference<AbstractMetricService> metricService = new
AtomicReference<>();
+
+ private final AtomicReference<String> failedDir = new AtomicReference<>();
+ private final Set<String> pendingDirs = new CopyOnWriteArraySet<>();
+
+ protected Counter totalFailedFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
+ protected Counter totalPendingFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
+ protected Map<String, Counter> dir2PendingFileCounterMap = new
ConcurrentHashMap<>();
+
+ public void updateTotalFailedFileCounter(final long number) {
+ totalFailedFileCounter.inc(number - totalFailedFileCounter.getCount());
+ }
+
+ public void updateTotalPendingFileCounter(final long number) {
+ totalPendingFileCounter.inc(number - totalPendingFileCounter.getCount());
+ }
+
+ public void updatePendingFileCounterInDir(final String dirName, final long
number) {
+ final Counter counter = dir2PendingFileCounterMap.get(dirName);
+ if (counter == null) {
+ LOGGER.debug("Failed to update file counter, dir({}) does not exist",
dirName);
+ return;
+ }
+ counter.inc(number - counter.getCount());
+ }
+
+ public void updatePendingDirList(final Set<String> givenListeningDirs) {
+ if (metricService.get() == null || Objects.equals(pendingDirs,
givenListeningDirs)) {
+ return;
+ }
+
+ pendingDirs.clear();
+ pendingDirs.addAll(givenListeningDirs);
+
+ unbindDir2PendingFileCounters(metricService.get());
+ rebindDir2PendingFileCounters();
}
- private Counter pendingFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
- private Counter queuingFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
- private Counter loadingFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
- private Counter failedFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
+ protected void unbindDir2PendingFileCounters(final AbstractMetricService
metricService) {
+ dir2PendingFileCounterMap
+ .keySet()
+ .forEach(
+ dir ->
+ metricService.remove(
+ MetricType.COUNTER,
+ getMetricName(),
+ Tag.TYPE.toString(),
+ PENDING_PREFIX + dir));
+ dir2PendingFileCounterMap.clear();
+ }
- public void recordPendingFileCounter(final long number) {
- pendingFileCounter.inc(number - pendingFileCounter.getCount());
+ private void rebindDir2PendingFileCounters() {
+ dir2PendingFileCounterMap.clear();
+ if (!pendingDirs.isEmpty()) {
+ for (String dir : pendingDirs) {
+ dir2PendingFileCounterMap.put(
+ dir,
+ metricService
+ .get()
+ .getOrCreateCounter(
+ getMetricName(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ PENDING_PREFIX + dir));
+ }
+ }
}
- public void recordQueuingFileCounter(final long number) {
- queuingFileCounter.inc(number);
+ public void updateFailedDir(final String dirName) {
+ if (metricService.get() == null || Objects.equals(failedDir.get(),
dirName)) {
+ return;
+ }
+
+ failedDir.set(dirName);
+
+ unbindFailedDirCounter(metricService.get());
+ rebindFailedDirCounter();
}
- public void recordLoadingFileCounter(final long number) {
- loadingFileCounter.inc(number);
+ protected void unbindFailedDirCounter(final AbstractMetricService
metricService) {
+ totalFailedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+ metricService.remove(
+ MetricType.COUNTER, getMetricName(), Tag.TYPE.toString(),
FAILED_PREFIX + failedDir.get());
}
- public void recordFailedFileCounter(final long number) {
- failedFileCounter.inc(number - failedFileCounter.getCount());
+ private void rebindFailedDirCounter() {
+ totalFailedFileCounter =
+ metricService
+ .get()
+ .getOrCreateCounter(
+ getMetricName(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ FAILED_PREFIX + failedDir.get());
}
@Override
public void bindTo(final AbstractMetricService metricService) {
- pendingFileCounter =
- metricService.getOrCreateCounter(
- Metric.ACTIVE_LOADING_FILES.toString(),
- MetricLevel.IMPORTANT,
- Tag.TYPE.toString(),
- PENDING);
- queuingFileCounter =
- metricService.getOrCreateCounter(
- Metric.ACTIVE_LOADING_FILES.toString(),
- MetricLevel.IMPORTANT,
- Tag.TYPE.toString(),
- QUEUING);
- loadingFileCounter =
- metricService.getOrCreateCounter(
- Metric.ACTIVE_LOADING_FILES.toString(),
- MetricLevel.IMPORTANT,
- Tag.TYPE.toString(),
- LOADING);
- failedFileCounter =
- metricService.getOrCreateCounter(
- Metric.ACTIVE_LOADING_FILES.toString(),
- MetricLevel.IMPORTANT,
- Tag.TYPE.toString(),
- FAILED);
+ this.metricService.set(metricService);
+
+ // Dir2PendingFileCounters' binding is triggered by updatePendingDirList
+ // FailedDirCounter's binding is triggered by updateFailedDir
+ bindOtherCounters(metricService);
}
+ protected abstract void bindOtherCounters(final AbstractMetricService
metricService);
+
@Override
public void unbindFrom(final AbstractMetricService metricService) {
- pendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
- queuingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
- loadingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
- failedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
-
- metricService.remove(
- MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), PENDING);
- metricService.remove(
- MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), QUEUING);
- metricService.remove(
- MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), LOADING);
- metricService.remove(
- MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), FAILED);
+ unbindDir2PendingFileCounters(metricService);
+ unbindFailedDirCounter(metricService);
+ unbindOtherCounters(metricService);
}
- public static ActiveLoadingFilesMetricsSet getInstance() {
- return ActiveLoadingFilesMetricsSet.INSTANCE;
- }
+ protected abstract void unbindOtherCounters(final AbstractMetricService
metricService);
+
+ protected abstract String getMetricName();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesNumberMetricsSet.java
similarity index 50%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesNumberMetricsSet.java
index aae166ef5d7..11105ed14c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesNumberMetricsSet.java
@@ -23,91 +23,85 @@ import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
-import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.type.Counter;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
-public class ActiveLoadingFilesMetricsSet implements IMetricSet {
+public class ActiveLoadingFilesNumberMetricsSet extends
ActiveLoadingFilesMetricsSet {
- private static final ActiveLoadingFilesMetricsSet INSTANCE = new
ActiveLoadingFilesMetricsSet();
+ private static final String PENDING = "pending";
+ private static final String QUEUING = "queuing";
+ private static final String LOADING = "loading";
- public static final String PENDING = "pending";
- public static final String QUEUING = "queuing";
- public static final String LOADING = "loading";
- public static final String FAILED = "failed";
-
- private ActiveLoadingFilesMetricsSet() {
- // empty construct
- }
-
- private Counter pendingFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
private Counter queuingFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
private Counter loadingFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
- private Counter failedFileCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
-
- public void recordPendingFileCounter(final long number) {
- pendingFileCounter.inc(number - pendingFileCounter.getCount());
- }
- public void recordQueuingFileCounter(final long number) {
+ public void increaseQueuingFileCounter(final long number) {
queuingFileCounter.inc(number);
}
- public void recordLoadingFileCounter(final long number) {
+ public void increaseLoadingFileCounter(final long number) {
loadingFileCounter.inc(number);
}
- public void recordFailedFileCounter(final long number) {
- failedFileCounter.inc(number - failedFileCounter.getCount());
- }
-
@Override
- public void bindTo(final AbstractMetricService metricService) {
- pendingFileCounter =
+ protected void bindOtherCounters(final AbstractMetricService metricService) {
+ totalPendingFileCounter =
metricService.getOrCreateCounter(
- Metric.ACTIVE_LOADING_FILES.toString(),
+ Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
MetricLevel.IMPORTANT,
Tag.TYPE.toString(),
PENDING);
queuingFileCounter =
metricService.getOrCreateCounter(
- Metric.ACTIVE_LOADING_FILES.toString(),
+ Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
MetricLevel.IMPORTANT,
Tag.TYPE.toString(),
QUEUING);
loadingFileCounter =
metricService.getOrCreateCounter(
- Metric.ACTIVE_LOADING_FILES.toString(),
+ Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
MetricLevel.IMPORTANT,
Tag.TYPE.toString(),
LOADING);
- failedFileCounter =
- metricService.getOrCreateCounter(
- Metric.ACTIVE_LOADING_FILES.toString(),
- MetricLevel.IMPORTANT,
- Tag.TYPE.toString(),
- FAILED);
}
@Override
- public void unbindFrom(final AbstractMetricService metricService) {
- pendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+ protected void unbindOtherCounters(final AbstractMetricService
metricService) {
+ totalPendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
queuingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
loadingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
- failedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
metricService.remove(
- MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), PENDING);
- metricService.remove(
- MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), QUEUING);
+ MetricType.COUNTER,
+ Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
+ Tag.TYPE.toString(),
+ PENDING);
metricService.remove(
- MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), LOADING);
+ MetricType.COUNTER,
+ Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
+ Tag.TYPE.toString(),
+ QUEUING);
metricService.remove(
- MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(),
Tag.TYPE.toString(), FAILED);
+ MetricType.COUNTER,
+ Metric.ACTIVE_LOADING_FILES_NUMBER.toString(),
+ Tag.TYPE.toString(),
+ LOADING);
+ }
+
+ @Override
+ protected String getMetricName() {
+ return Metric.ACTIVE_LOADING_FILES_NUMBER.toString();
}
- public static ActiveLoadingFilesMetricsSet getInstance() {
- return ActiveLoadingFilesMetricsSet.INSTANCE;
+ public static ActiveLoadingFilesNumberMetricsSet getInstance() {
+ return ActiveLoadingFilesNumberMetricsSet.INSTANCE;
+ }
+
+ private static final ActiveLoadingFilesNumberMetricsSet INSTANCE =
+ new ActiveLoadingFilesNumberMetricsSet();
+
+ private ActiveLoadingFilesNumberMetricsSet() {
+ // empty construct
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesSizeMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesSizeMetricsSet.java
new file mode 100644
index 00000000000..554918d981c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesSizeMetricsSet.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.metrics;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class ActiveLoadingFilesSizeMetricsSet extends
ActiveLoadingFilesMetricsSet {
+
+ private static final String PENDING_SIZE = "pending (total)";
+
+ @Override
+ protected void bindOtherCounters(AbstractMetricService metricService) {
+ totalPendingFileCounter =
+ metricService.getOrCreateCounter(
+ Metric.ACTIVE_LOADING_FILES_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.TYPE.toString(),
+ PENDING_SIZE);
+ }
+
+ @Override
+ protected void unbindOtherCounters(AbstractMetricService metricService) {
+ totalPendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.ACTIVE_LOADING_FILES_SIZE.toString(),
+ Tag.TYPE.toString(),
+ PENDING_SIZE);
+ }
+
+ @Override
+ protected String getMetricName() {
+ return Metric.ACTIVE_LOADING_FILES_SIZE.toString();
+ }
+
+ public static ActiveLoadingFilesSizeMetricsSet getInstance() {
+ return INSTANCE;
+ }
+
+ private static final ActiveLoadingFilesSizeMetricsSet INSTANCE =
+ new ActiveLoadingFilesSizeMetricsSet();
+
+ private ActiveLoadingFilesSizeMetricsSet() {
+ // empty construct
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 707f46fa259..7c2f4ac1016 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -172,7 +172,8 @@ public enum Metric {
SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),
SUBSCRIPTION_EVENT_TRANSFER("subscription_event_transfer"),
// load related
- ACTIVE_LOADING_FILES("active_loading_files"),
+ ACTIVE_LOADING_FILES_NUMBER("active_loading_files_number"),
+ ACTIVE_LOADING_FILES_SIZE("active_loading_files_size"),
LOAD_MEM("load_mem"),
LOAD_DISK_IO("load_disk_io"),
LOAD_TIME_COST("load_time_cost"),