This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1dce787c39e [Metric] Fix flush point statistics (#10915)
1dce787c39e is described below
commit 1dce787c39e14973918ee390c470b901b68be73e
Author: ZhangHongYin <[email protected]>
AuthorDate: Tue Aug 22 19:18:30 2023 +0800
[Metric] Fix flush point statistics (#10915)
---
.../dataregion/flush/MemTableFlushTask.java | 58 +++++++++++++++-------
.../iotdb/metrics/AbstractMetricService.java | 45 +++++++++--------
.../metrics/reporter/iotdb/IoTDBReporter.java | 6 +--
3 files changed, 69 insertions(+), 40 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 05241fea5e6..19450cbe9fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
import
org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -43,6 +44,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -59,6 +61,8 @@ public class MemTableFlushTask {
FlushSubTaskPoolManager.getInstance();
private static final WritingMetrics WRITING_METRICS =
WritingMetrics.getInstance();
private static IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ /* storage group name -> last time */
+ private static final Map<String, Long> flushPointsCache = new
ConcurrentHashMap<>();
private final Future<?> encodingTaskFuture;
private final Future<?> ioTaskFuture;
private RestorableTsFileIOWriter writer;
@@ -279,23 +283,7 @@ public class MemTableFlushTask {
Thread.currentThread().interrupt();
}
- if (!storageGroup.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
- int lastIndex = storageGroup.lastIndexOf("-");
- if (lastIndex == -1) {
- lastIndex = storageGroup.length();
- }
- MetricService.getInstance()
- .gaugeWithInternalReportAsync(
- memTable.getTotalPointsNum(),
- Metric.POINTS.toString(),
- MetricLevel.CORE,
- Tag.DATABASE.toString(),
- storageGroup.substring(0, lastIndex),
- Tag.TYPE.toString(),
- "flush",
- Tag.REGION.toString(),
- dataRegionId);
- }
+ recordFlushPointsMetric();
LOGGER.info(
"Database {}, flushing memtable {} into disk: Encoding data cost
" + "{} ms.",
@@ -306,6 +294,42 @@ public class MemTableFlushTask {
}
};
+ private void recordFlushPointsMetric() {
+ if (storageGroup.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
+ return;
+ }
+ int lastIndex = storageGroup.lastIndexOf("-");
+ if (lastIndex == -1) {
+ lastIndex = storageGroup.length();
+ }
+ String storageGroupName = storageGroup.substring(0, lastIndex);
+ long currentTime = DateTimeUtils.currentTime();
+ // compute the flush points
+ long writeTime =
+ flushPointsCache.compute(
+ storageGroupName,
+ (storageGroup, lastTime) -> {
+ if (lastTime == null || lastTime != currentTime) {
+ return currentTime;
+ } else {
+ return currentTime + 1;
+ }
+ });
+ // record the flush points
+ MetricService.getInstance()
+ .gaugeWithInternalReportAsync(
+ memTable.getTotalPointsNum(),
+ Metric.POINTS.toString(),
+ MetricLevel.CORE,
+ writeTime,
+ Tag.DATABASE.toString(),
+ storageGroup.substring(0, lastIndex),
+ Tag.TYPE.toString(),
+ "flush",
+ Tag.REGION.toString(),
+ dataRegionId);
+ }
+
/** io task (third task of pipeline) */
@SuppressWarnings("squid:S135")
private Runnable ioTask =
diff --git
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
index 5ac5a3737f2..06e5632ae6a 100644
---
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
+++
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
@@ -271,9 +271,9 @@ public abstract class AbstractMetricService {
/** GetOrCreateCounter with internal report. */
public Counter getOrCreateCounterWithInternalReport(
- String metric, MetricLevel metricLevel, String... tags) {
+ String metric, MetricLevel metricLevel, long time, String... tags) {
Counter counter = metricManager.getOrCreateCounter(metric, metricLevel,
tags);
- internalReporter.writeMetricToIoTDB(counter, metric, tags);
+ internalReporter.writeMetricToIoTDB(counter, metric, time, tags);
return counter;
}
@@ -287,69 +287,74 @@ public abstract class AbstractMetricService {
/** GetOrCreateGauge with internal report. */
public Gauge getOrCreateGaugeWithInternalReport(
- String metric, MetricLevel metricLevel, String... tags) {
+ String metric, MetricLevel metricLevel, long time, String... tags) {
Gauge gauge = metricManager.getOrCreateGauge(metric, metricLevel, tags);
- internalReporter.writeMetricToIoTDB(gauge, metric, tags);
+ internalReporter.writeMetricToIoTDB(gauge, metric, time, tags);
return gauge;
}
/** GetOrCreateRate with internal report. */
public Rate getOrCreateRateWithInternalReport(
- String metric, MetricLevel metricLevel, String... tags) {
+ String metric, MetricLevel metricLevel, long time, String... tags) {
Rate rate = metricManager.getOrCreateRate(metric, metricLevel, tags);
- internalReporter.writeMetricToIoTDB(rate, metric, tags);
+ internalReporter.writeMetricToIoTDB(rate, metric, time, tags);
return rate;
}
/** GetOrCreateHistogram with internal report. */
public Histogram getOrCreateHistogramWithInternalReport(
- String metric, MetricLevel metricLevel, String... tags) {
+ String metric, MetricLevel metricLevel, long time, String... tags) {
Histogram histogram = metricManager.getOrCreateHistogram(metric,
metricLevel, tags);
- internalReporter.writeMetricToIoTDB(histogram, metric, tags);
+ internalReporter.writeMetricToIoTDB(histogram, metric, time, tags);
return histogram;
}
/** GetOrCreateTimer with internal report. */
public Timer getOrCreateTimerWithInternalReport(
- String metric, MetricLevel metricLevel, String... tags) {
+ String metric, MetricLevel metricLevel, long time, String... tags) {
Timer timer = metricManager.getOrCreateTimer(metric, metricLevel, tags);
- internalReporter.writeMetricToIoTDB(timer, metric, tags);
+ internalReporter.writeMetricToIoTDB(timer, metric, time, tags);
return timer;
}
/** Count with internal report. */
public void countWithInternalReportAsync(
- long delta, String metric, MetricLevel metricLevel, String... tags) {
+ long delta, String metric, MetricLevel metricLevel, long time, String...
tags) {
internalReporter.writeMetricToIoTDB(
- metricManager.count(delta, metric, metricLevel, tags), metric, tags);
+ metricManager.count(delta, metric, metricLevel, tags), metric, time,
tags);
}
/** Gauge value with internal report. */
public void gaugeWithInternalReportAsync(
- long value, String metric, MetricLevel metricLevel, String... tags) {
+ long value, String metric, MetricLevel metricLevel, long time, String...
tags) {
internalReporter.writeMetricToIoTDB(
- metricManager.gauge(value, metric, metricLevel, tags), metric, tags);
+ metricManager.gauge(value, metric, metricLevel, tags), metric, time,
tags);
}
/** Rate with internal report. */
public void rateWithInternalReportAsync(
- long value, String metric, MetricLevel metricLevel, String... tags) {
+ long value, String metric, MetricLevel metricLevel, long time, String...
tags) {
internalReporter.writeMetricToIoTDB(
- metricManager.rate(value, metric, metricLevel, tags), metric, tags);
+ metricManager.rate(value, metric, metricLevel, tags), metric, time,
tags);
}
/** Histogram with internal report. */
public void histogramWithInternalReportAsync(
- long value, String metric, MetricLevel metricLevel, String... tags) {
+ long value, String metric, MetricLevel metricLevel, long time, String...
tags) {
internalReporter.writeMetricToIoTDB(
- metricManager.histogram(value, metric, metricLevel, tags), metric,
tags);
+ metricManager.histogram(value, metric, metricLevel, tags), metric,
time, tags);
}
/** Timer with internal report. */
public void timerWithInternalReportAsync(
- long delta, TimeUnit timeUnit, String metric, MetricLevel metricLevel,
String... tags) {
+ long delta,
+ TimeUnit timeUnit,
+ String metric,
+ MetricLevel metricLevel,
+ long time,
+ String... tags) {
internalReporter.writeMetricToIoTDB(
- metricManager.timer(delta, timeUnit, metric, metricLevel, tags),
metric, tags);
+ metricManager.timer(delta, timeUnit, metric, metricLevel, tags),
metric, time, tags);
}
public List<Pair<String, String[]>> getAllMetricKeys() {
diff --git
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
index 2cc67f13a31..db1e915c8e9 100644
---
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
+++
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
@@ -36,14 +36,14 @@ public abstract class IoTDBReporter implements Reporter {
*
* @param metric the target metric
* @param name the name of metric
+ * @param time the target time of metric
* @param tags the tags of metric
*/
- public void writeMetricToIoTDB(IMetric metric, String name, String... tags) {
+ public void writeMetricToIoTDB(IMetric metric, String name, long time,
String... tags) {
if (!(metric instanceof DoNothingMetric)) {
Map<String, Object> values = new HashMap<>();
metric.constructValueMap(values);
- writeMetricToIoTDB(
- values, IoTDBMetricsUtils.generatePath(name, tags),
System.currentTimeMillis());
+ writeMetricToIoTDB(values, IoTDBMetricsUtils.generatePath(name, tags),
time);
}
}