This is an automated email from the ASF dual-hosted git repository.
spricoder pushed a commit to branch fix/flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fix/flush by this push:
new 282184058ef Fix concurrent problem
282184058ef is described below
commit 282184058ef0c242bd028ec29bb47f47d3fb29d9
Author: spricoder <[email protected]>
AuthorDate: Tue Aug 22 00:28:26 2023 +0800
Fix concurrent problem
---
.../dataregion/flush/MemTableFlushTask.java | 75 +++++++++++++---------
1 file changed, 44 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 141c18d7a04..5f1cea00c48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -61,6 +61,8 @@ public class MemTableFlushTask {
FlushSubTaskPoolManager.getInstance();
private static final WritingMetrics WRITING_METRICS =
WritingMetrics.getInstance();
private static IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ /* storage group name -> <latest time, points>*/
+ private static final Map<String, Pair<Long, Long>> flushPointsCache = new
ConcurrentHashMap<>();
private final Future<?> encodingTaskFuture;
private final Future<?> ioTaskFuture;
private RestorableTsFileIOWriter writer;
@@ -70,7 +72,6 @@ public class MemTableFlushTask {
(config.isEnableMemControl() &&
SystemInfo.getInstance().isEncodingFasterThanIo())
? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
: new LinkedBlockingQueue<>();
- private final Map<String, Pair<Long, Long>> flushPointsCache;
private String storageGroup;
private String dataRegionId;
@@ -96,7 +97,6 @@ public class MemTableFlushTask {
this.dataRegionId = dataRegionId;
this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
- this.flushPointsCache = new ConcurrentHashMap<>();
LOGGER.debug(
"flush task of database {} memtable is created, flushing to file {}.",
storageGroup,
@@ -283,35 +283,7 @@ public class MemTableFlushTask {
Thread.currentThread().interrupt();
}
- if (!storageGroup.startsWith(IoTDBConfig.SYSTEM_DATABASE)) {
- int lastIndex = storageGroup.lastIndexOf("-");
- if (lastIndex == -1) {
- lastIndex = storageGroup.length();
- }
- String storageGroupName = storageGroup.substring(0, lastIndex);
- long currentTime = DateTimeUtils.currentTime();
- long points = memTable.getTotalPointsNum();
- Pair<Long, Long> previousPair =
flushPointsCache.get(storageGroupName);
- if (previousPair != null) {
- if (previousPair.left == currentTime) {
- points += previousPair.right;
- } else {
- flushPointsCache.put(storageGroupName, new Pair<>(currentTime,
points));
- }
- }
- MetricService.getInstance()
- .gaugeWithInternalReportAsync(
- points,
- Metric.POINTS.toString(),
- MetricLevel.CORE,
- currentTime,
- Tag.DATABASE.toString(),
- storageGroup.substring(0, lastIndex),
- Tag.TYPE.toString(),
- "flush",
- Tag.REGION.toString(),
- dataRegionId);
- }
+ recordFlushPointsMetric();
LOGGER.info(
"Database {}, flushing memtable {} into disk: Encoding data cost
" + "{} ms.",
@@ -322,6 +294,47 @@ public class MemTableFlushTask {
}
};
+ private void recordFlushPointsMetric() {
+ if (storageGroup.startsWith(IoTDBConfig.SYSTEM_DATABASE)) {
+ return;
+ }
+ int lastIndex = storageGroup.lastIndexOf("-");
+ if (lastIndex == -1) {
+ lastIndex = storageGroup.length();
+ }
+ String storageGroupName = storageGroup.substring(0, lastIndex);
+ long currentTime = DateTimeUtils.currentTime();
+ long currentPoints = memTable.getTotalPointsNum();
+ // compute the flush points
+ long points =
+ flushPointsCache.compute(
+ storageGroupName,
+ (storageGroup, previousPair) -> {
+ if (previousPair == null || previousPair.left !=
currentTime) {
+ // if previousPair is null or previousPair.latestTime not
equals currentTime,
+ // then create a new pair
+ return new Pair<>(currentTime, currentPoints);
+ } else {
+ // if previousPair.latestTime equals currentTime, then
accumulate the points
+ return new Pair<>(currentTime, previousPair.right +
currentPoints);
+ }
+ })
+ .right;
+ // record the flush points
+ MetricService.getInstance()
+ .gaugeWithInternalReportAsync(
+ points,
+ Metric.POINTS.toString(),
+ MetricLevel.CORE,
+ currentTime,
+ Tag.DATABASE.toString(),
+ storageGroup.substring(0, lastIndex),
+ Tag.TYPE.toString(),
+ "flush",
+ Tag.REGION.toString(),
+ dataRegionId);
+ }
+
/** io task (third task of pipeline) */
@SuppressWarnings("squid:S135")
private Runnable ioTask =