This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 88549541f6b Add a param to switch whether to enable data separation or
not (#11692)
88549541f6b is described below
commit 88549541f6bf53717169294a2fe2b222d05ddede
Author: 周沛辰 <[email protected]>
AuthorDate: Thu Dec 14 21:45:02 2023 -0600
Add a param to switch whether to enable data separation or not (#11692)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +-
.../db/storageengine/dataregion/DataRegion.java | 178 +++++++++++----------
.../dataregion/HashLastFlushTimeMap.java | 115 ++++---------
.../dataregion/ILastFlushTimeMap.java | 31 ++--
.../storageengine/dataregion/DataRegionTest.java | 48 +++---
.../resources/conf/iotdb-common.properties | 6 +-
7 files changed, 173 insertions(+), 224 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 69d6b06f60d..e921a374ef6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -839,7 +839,11 @@ public class IoTDBConfig {
/** the interval to log recover progress of each vsg when starting iotdb */
private long recoveryLogIntervalInMs = 5_000L;
- private boolean enableDiscardOutOfOrderData = false;
+ /**
+ * Separate sequence and unsequence data or not. If it is false, then all
data will be written
+ * into unsequence data dir.
+ */
+ private boolean enableSeparateData = true;
/** the method to transform device path to device id, can be 'Plain' or
'SHA256' */
private String deviceIDTransformationMethod = "Plain";
@@ -1402,12 +1406,12 @@ public class IoTDBConfig {
this.rpcPort = rpcPort;
}
- public boolean isEnableDiscardOutOfOrderData() {
- return enableDiscardOutOfOrderData;
+ public boolean isEnableSeparateData() {
+ return enableSeparateData;
}
- public void setEnableDiscardOutOfOrderData(boolean
enableDiscardOutOfOrderData) {
- this.enableDiscardOutOfOrderData = enableDiscardOutOfOrderData;
+ public void setEnableSeparateData(boolean enableSeparateData) {
+ this.enableSeparateData = enableSeparateData;
}
public String getSystemDir() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7949b5768f3..d2f1d9bcfa2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -834,11 +834,10 @@ public class IoTDBDescriptor {
properties.getProperty(
"recovery_log_interval_in_ms",
String.valueOf(conf.getRecoveryLogIntervalInMs()))));
- conf.setEnableDiscardOutOfOrderData(
+ conf.setEnableSeparateData(
Boolean.parseBoolean(
properties.getProperty(
- "enable_discard_out_of_order_data",
- Boolean.toString(conf.isEnableDiscardOutOfOrderData()))));
+ "enable_separate_data",
Boolean.toString(conf.isEnableSeparateData()))));
conf.setWindowEvaluationThreadCount(
Integer.parseInt(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 8b6c1d1289d..60515f8f459 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -145,7 +145,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
@@ -223,8 +222,6 @@ public class DataRegion implements IDataRegionForQuery {
private final CopyOnReadLinkedList<TsFileProcessor>
closingUnSequenceTsFileProcessor =
new CopyOnReadLinkedList<>();
- private final AtomicBoolean isSettling = new AtomicBoolean();
-
/** data region id. */
private final String dataRegionId;
/** database name. */
@@ -376,14 +373,6 @@ public class DataRegion implements IDataRegionForQuery {
return ret;
}
- public AtomicBoolean getIsSettling() {
- return isSettling;
- }
-
- public void setSettling(boolean isSettling) {
- this.isSettling.set(isSettling);
- }
-
/** this class is used to store recovering context. */
private class DataRegionRecoveryContext {
/** number of files to be recovered. */
@@ -547,15 +536,17 @@ public class DataRegion implements IDataRegionForQuery {
false,
partitionFiles.getKey() == latestPartitionId);
}
- TimePartitionManager.getInstance()
- .registerTimePartitionInfo(
- new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
- latestPartitionId,
- false,
- Long.MAX_VALUE,
- lastFlushTimeMap.getMemSize(latestPartitionId),
- true));
+ if (config.isEnableSeparateData()) {
+ TimePartitionManager.getInstance()
+ .registerTimePartitionInfo(
+ new TimePartitionInfo(
+ new DataRegionId(Integer.parseInt(dataRegionId)),
+ latestPartitionId,
+ false,
+ Long.MAX_VALUE,
+ lastFlushTimeMap.getMemSize(latestPartitionId),
+ true));
+ }
}
// wait until all unsealed TsFiles have been recovered
for (WALRecoverListener recoverListener : recoverListeners) {
@@ -609,8 +600,12 @@ public class DataRegion implements IDataRegionForQuery {
long endTime = resource.getEndTime(deviceId);
endTimeMap.put(deviceId.intern(), endTime);
}
- lastFlushTimeMap.setMultiDeviceFlushedTime(timePartitionId, endTimeMap);
- lastFlushTimeMap.setMultiDeviceGlobalFlushedTime(endTimeMap);
+ if (config.isEnableSeparateData()) {
+ lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId,
endTimeMap);
+ }
+ if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+ lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
+ }
}
public void initCompactionSchedule() {
@@ -829,7 +824,7 @@ public class DataRegion implements IDataRegionForQuery {
for (TsFileResource tsFileResource : resourceList) {
recoverSealedTsFiles(tsFileResource, context, isSeq);
}
- if (isLatestPartition) {
+ if (isLatestPartition && config.isEnableSeparateData()) {
lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId);
for (TsFileResource tsFileResource : resourceList) {
updateLastFlushTime(tsFileResource, isSeq);
@@ -875,7 +870,8 @@ public class DataRegion implements IDataRegionForQuery {
// init map
long timePartitionId =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
- if
(!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+ if (config.isEnableSeparateData()
+ &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
@@ -888,15 +884,10 @@ public class DataRegion implements IDataRegionForQuery {
}
boolean isSequence =
- insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(
- timePartitionId,
insertRowNode.getDevicePath().getFullPath());
-
- // is unsequence and user set config to discard out of order data
- if (!isSequence
- &&
IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
- return;
- }
+ config.isEnableSeparateData()
+ && insertRowNode.getTime()
+ > lastFlushTimeMap.getFlushedTime(
+ timePartitionId,
insertRowNode.getDevicePath().getFullPath());
// insert to sequence or unSequence file
insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId);
@@ -962,7 +953,8 @@ public class DataRegion implements IDataRegionForQuery {
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]);
// init map
- if
(!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
+ if (config.isEnableSeparateData()
+ &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
@@ -975,8 +967,10 @@ public class DataRegion implements IDataRegionForQuery {
}
long lastFlushTime =
- lastFlushTimeMap.getFlushedTime(
- beforeTimePartition,
insertTabletNode.getDevicePath().getFullPath());
+ config.isEnableSeparateData()
+ ? lastFlushTimeMap.getFlushedTime(
+ beforeTimePartition,
insertTabletNode.getDevicePath().getFullPath())
+ : Long.MAX_VALUE;
// if is sequence
boolean isSequence = false;
@@ -986,12 +980,10 @@ public class DataRegion implements IDataRegionForQuery {
// judge if we should insert sequence
if (!isSequence && time > lastFlushTime) {
// insert into unsequence and then start sequence
- if
(!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
- noFailure =
- insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, false, results,
beforeTimePartition)
- && noFailure;
- }
+ noFailure =
+ insertTabletToTsFileProcessor(
+ insertTabletNode, before, loc, false, results,
beforeTimePartition)
+ && noFailure;
before = loc;
isSequence = true;
}
@@ -999,18 +991,14 @@ public class DataRegion implements IDataRegionForQuery {
}
// do not forget last part
- if (before < loc
- && (isSequence
- ||
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
+ if (before < loc) {
noFailure =
insertTabletToTsFileProcessor(
insertTabletNode, before, loc, isSequence, results,
beforeTimePartition)
&& noFailure;
}
- long globalLatestFlushedTime =
-
lastFlushTimeMap.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
startTime = System.nanoTime();
- tryToUpdateBatchInsertLastCache(insertTabletNode,
globalLatestFlushedTime);
+ tryToUpdateBatchInsertLastCache(insertTabletNode);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
- startTime);
if (!noFailure) {
@@ -1083,13 +1071,15 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
- private void tryToUpdateBatchInsertLastCache(InsertTabletNode node, long
latestFlushedTime) {
+ private void tryToUpdateBatchInsertLastCache(InsertTabletNode node) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
||
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
&& node.isSyncFromLeaderWhenUsingIoTConsensus())) {
// disable updating last cache on follower
return;
}
+ long latestFlushedTime =
+
lastFlushTimeMap.getGlobalFlushedTime(node.getDevicePath().getFullPath());
String[] measurements = node.getMeasurements();
MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
String[] rawMeasurements = new String[measurements.length];
@@ -1126,12 +1116,8 @@ public class DataRegion implements IDataRegionForQuery {
}
tsFileProcessor.insert(insertRowNode);
-
- long globalLatestFlushTime =
-
lastFlushTimeMap.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
-
long startTime = System.nanoTime();
- tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
+ tryToUpdateInsertLastCache(insertRowNode);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
- startTime);
// check memtable size and may asyncTryToFlush the work memtable
@@ -1140,13 +1126,15 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- private void tryToUpdateInsertLastCache(InsertRowNode node, long
latestFlushedTime) {
+ private void tryToUpdateInsertLastCache(InsertRowNode node) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
||
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
&& node.isSyncFromLeaderWhenUsingIoTConsensus())) {
// disable updating last cache on follower
return;
}
+ long latestFlushedTime =
+
lastFlushTimeMap.getGlobalFlushedTime(node.getDevicePath().getFullPath());
String[] measurements = node.getMeasurements();
MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
String[] rawMeasurements = new String[measurements.length];
@@ -2234,25 +2222,44 @@ public class DataRegion implements IDataRegionForQuery {
private void unsequenceFlushCallback(
TsFileProcessor processor, Map<String, Long> updateMap, long
systemFlushTime) {
- TimePartitionManager.getInstance()
- .updateAfterFlushing(
- new DataRegionId(Integer.valueOf(dataRegionId)),
- processor.getTimeRangeId(),
- systemFlushTime,
- lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
- workSequenceTsFileProcessors.get(processor.getTimeRangeId()) !=
null);
+ if (!config.isEnableSeparateData()
+ && CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+ // update globalLastFlushTime if and only if isEnableSeparateData is
false and
+ // isLastCacheEnable is true
+ lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(updateMap);
+ }
+ if (config.isEnableSeparateData()) {
+ TimePartitionManager.getInstance()
+ .updateAfterFlushing(
+ new DataRegionId(Integer.valueOf(dataRegionId)),
+ processor.getTimeRangeId(),
+ systemFlushTime,
+ lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
+ workSequenceTsFileProcessors.get(processor.getTimeRangeId()) !=
null);
+ }
}
private void sequenceFlushCallback(
TsFileProcessor processor, Map<String, Long> updateMap, long
systemFlushTime) {
- lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(),
updateMap);
- TimePartitionManager.getInstance()
- .updateAfterFlushing(
- new DataRegionId(Integer.valueOf(dataRegionId)),
- processor.getTimeRangeId(),
- systemFlushTime,
- lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
- workUnsequenceTsFileProcessors.get(processor.getTimeRangeId()) !=
null);
+ if (config.isEnableSeparateData()
+ && CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+ // update both partitionLastFlushTime and globalLastFlushTime
+ lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(),
updateMap);
+ } else {
+ // isEnableSeparateData is true and isLastCacheEnable is false, then
update
+ // partitionLastFlushTime only
+
lastFlushTimeMap.updateMultiDeviceFlushedTime(processor.getTimeRangeId(),
updateMap);
+ }
+
+ if (config.isEnableSeparateData()) {
+ TimePartitionManager.getInstance()
+ .updateAfterFlushing(
+ new DataRegionId(Integer.valueOf(dataRegionId)),
+ processor.getTimeRangeId(),
+ systemFlushTime,
+ lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
+ workSequenceTsFileProcessors.get(processor.getTimeRangeId()) !=
null);
+ }
}
/** put the memtable back to the MemTablePool and make the metadata in
writer visible */
@@ -2622,8 +2629,12 @@ public class DataRegion implements IDataRegionForQuery {
for (String device : newTsFileResource.getDevices()) {
long endTime = newTsFileResource.getEndTime(device);
long timePartitionId = TimePartitionUtils.getTimePartitionId(endTime);
- lastFlushTimeMap.updateFlushedTime(timePartitionId, device, endTime);
- lastFlushTimeMap.updateGlobalFlushedTime(device, endTime);
+ if (config.isEnableSeparateData()) {
+ lastFlushTimeMap.updateOneDeviceFlushedTime(timePartitionId, device,
endTime);
+ }
+ if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+ lastFlushTimeMap.updateOneDeviceGlobalFlushedTime(device, endTime);
+ }
}
}
@@ -2954,7 +2965,6 @@ public class DataRegion implements IDataRegionForQuery {
if (deleted) {
return;
}
- boolean isSequence = false;
for (int i = 0; i <
insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
InsertRowNode insertRowNode =
insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
if (!isAlive(insertRowNode.getTime())) {
@@ -2976,7 +2986,8 @@ public class DataRegion implements IDataRegionForQuery {
// init map
long timePartitionId =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
- if
(!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+ if (config.isEnableSeparateData()
+ &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
@@ -2988,20 +2999,11 @@ public class DataRegion implements IDataRegionForQuery {
tsFileManager.isLatestTimePartition(timePartitionId)));
}
- // as the plans have been ordered, and we have get the write lock,
- // So, if a plan is sequenced, then all the rest plans are sequenced.
- //
- if (!isSequence) {
- isSequence =
- insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(
- timePartitionId,
insertRowNode.getDevicePath().getFullPath());
- }
- // is unsequence and user set config to discard out of order data
- if (!isSequence
- &&
IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
- return;
- }
+ boolean isSequence =
+ config.isEnableSeparateData()
+ && insertRowNode.getTime()
+ > lastFlushTimeMap.getFlushedTime(
+ timePartitionId,
insertRowNode.getDevicePath().getFullPath());
// insert to sequence or unSequence file
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 7fce13405fd..d8215aff763 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -54,81 +54,38 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
* data point should be put into a sequential file or an unsequential file.
Data of some device
* with timestamp less than or equals to the device's latestFlushedTime
should go into an
* unsequential file.
+ *
+ * <p>It is used to separate sequence and unsequence data.
*/
- private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice
= new HashMap<>();
- /** used to record the latest flush time while upgrading and inserting */
- private Map<Long, Map<String, Long>>
newlyFlushedPartitionLatestFlushedTimeForEachDevice =
+ private final Map<Long, Map<String, Long>>
partitionLatestFlushedTimeForEachDevice =
new HashMap<>();
+
/**
* global mapping of device -> largest timestamp of the latest memtable to *
be submitted to
* asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to
maintain global
* latestFlushedTime of devices and will be updated along with
* partitionLatestFlushedTimeForEachDevice
+ *
+ * <p>It is used to update last cache.
*/
- private Map<String, Long> globalLatestFlushedTimeForEachDevice = new
HashMap<>();
+ private final Map<String, Long> globalLatestFlushedTimeForEachDevice = new
HashMap<>();
/** used for recovering flush time from tsfile resource */
TsFileManager tsFileManager;
/** record memory cost of map for each partitionId */
- private Map<Long, Long> memCostForEachPartition = new HashMap<>();
+ private final Map<Long, Long> memCostForEachPartition = new HashMap<>();
public HashLastFlushTimeMap(TsFileManager tsFileManager) {
this.tsFileManager = tsFileManager;
}
@Override
- public void setMultiDeviceFlushedTime(long timePartitionId, Map<String,
Long> flushedTimeMap) {
- Map<String, Long> flushTimeMapForPartition =
- partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
- if (flushTimeMapForPartition == null) {
- return;
- }
- long memIncr = 0;
- for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
- if (!flushTimeMapForPartition.containsKey(entry.getKey())) {
- memIncr += HASHMAP_NODE_BASIC_SIZE + 2L * entry.getKey().length();
- }
- flushTimeMapForPartition.merge(entry.getKey(), entry.getValue(),
Math::max);
- }
- long finalMemIncr = memIncr;
- memCostForEachPartition.compute(
- timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
- }
-
- @Override
- public void setOneDeviceFlushedTime(long timePartitionId, String path, long
time) {
+ public void updateOneDeviceFlushedTime(long timePartitionId, String path,
long time) {
Map<String, Long> flushTimeMapForPartition =
- partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
- if (flushTimeMapForPartition == null) {
- return;
- }
- if (flushTimeMapForPartition.put(path, time) == null) {
- long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * path.length();
- memCostForEachPartition.compute(
- timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
- }
- }
-
- @Override
- public void setMultiDeviceGlobalFlushedTime(Map<String, Long>
globalFlushedTimeMap) {
- for (Map.Entry<String, Long> entry : globalFlushedTimeMap.entrySet()) {
- globalLatestFlushedTimeForEachDevice.merge(entry.getKey(),
entry.getValue(), Math::max);
- }
- }
-
- @Override
- public void setOneDeviceGlobalFlushedTime(String path, long time) {
- globalLatestFlushedTimeForEachDevice.put(path, time);
- }
+ partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
+ timePartitionId, id -> new HashMap<>());
- @Override
- public void updateFlushedTime(long timePartitionId, String path, long time) {
- Map<String, Long> flushTimeMapForPartition =
- partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
- if (flushTimeMapForPartition == null) {
- return;
- }
flushTimeMapForPartition.compute(
path,
(k, v) -> {
@@ -146,17 +103,34 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
}
@Override
- public void updateGlobalFlushedTime(String path, long time) {
+ public void updateMultiDeviceFlushedTime(long timePartitionId, Map<String,
Long> flushedTimeMap) {
+ Map<String, Long> flushTimeMapForPartition =
+ partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
+ timePartitionId, id -> new HashMap<>());
+
+ long memIncr = 0;
+ for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
+ if (!flushTimeMapForPartition.containsKey(entry.getKey())) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + 2L * entry.getKey().length();
+ }
+ flushTimeMapForPartition.merge(entry.getKey(), entry.getValue(),
Math::max);
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
+ }
+
+ @Override
+ public void updateOneDeviceGlobalFlushedTime(String path, long time) {
globalLatestFlushedTimeForEachDevice.compute(
path, (k, v) -> v == null ? time : Math.max(v, time));
}
@Override
- public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
- long partitionId, String deviceId, long time) {
- newlyFlushedPartitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
+ public void updateMultiDeviceGlobalFlushedTime(Map<String, Long>
globalFlushedTimeMap) {
+ for (Map.Entry<String, Long> entry : globalFlushedTimeMap.entrySet()) {
+ globalLatestFlushedTimeForEachDevice.merge(entry.getKey(),
entry.getValue(), Math::max);
+ }
}
@Override
@@ -168,33 +142,12 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
return true;
}
- @Override
- public void applyNewlyFlushedTimeToFlushedTime() {
- for (Map.Entry<Long, Map<String, Long>> entry :
- newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet()) {
- long timePartitionId = entry.getKey();
- Map<String, Long> latestFlushTimeForPartition =
-
partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new
HashMap<>());
- for (Map.Entry<String, Long> endTimeMap : entry.getValue().entrySet()) {
- String device = endTimeMap.getKey();
- long endTime = endTimeMap.getValue();
- if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) <
endTime) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .put(device, endTime);
- }
- }
- }
- }
-
@Override
public void updateLatestFlushTime(long partitionId, Map<String, Long>
updateMap) {
for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(partitionId, id -> new HashMap<>())
.merge(entry.getKey(), entry.getValue(), Math::max);
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
- partitionId, entry.getKey(), entry.getValue());
if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(),
Long.MIN_VALUE)
< entry.getValue()) {
globalLatestFlushedTimeForEachDevice.put(entry.getKey(),
entry.getValue());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 023d5e3c0c8..97f114e04a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -24,24 +24,21 @@ import java.util.Map;
/** This interface manages last time and flush time for sequence and
unsequence determination */
public interface ILastFlushTimeMap {
- // region set
- void setMultiDeviceFlushedTime(long timePartitionId, Map<String, Long>
flushedTimeMap);
-
- void setOneDeviceFlushedTime(long timePartitionId, String path, long time);
-
- void setMultiDeviceGlobalFlushedTime(Map<String, Long> globalFlushedTimeMap);
-
- void setOneDeviceGlobalFlushedTime(String path, long time);
- // endregion
-
// region update
+ /** Update partitionLatestFlushedTimeForEachDevice. */
+ void updateOneDeviceFlushedTime(long timePartitionId, String path, long
time);
+
+ void updateMultiDeviceFlushedTime(long timePartitionId, Map<String, Long>
flushedTimeMap);
- void updateFlushedTime(long timePartitionId, String path, long time);
+ /** Update globalLatestFlushedTimeForEachDevice. */
+ void updateOneDeviceGlobalFlushedTime(String path, long time);
- void updateGlobalFlushedTime(String path, long time);
+ void updateMultiDeviceGlobalFlushedTime(Map<String, Long>
globalFlushedTimeMap);
- void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
- long partitionId, String deviceId, long time);
+ /**
+ * Update both partitionLatestFlushedTimeForEachDevice and
globalLatestFlushedTimeForEachDevice.
+ */
+ void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
// endregion
// region ensure
@@ -49,12 +46,6 @@ public interface ILastFlushTimeMap {
// endregion
- // region support upgrade methods
- void applyNewlyFlushedTimeToFlushedTime();
-
- void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
- // endregion
-
// region read
long getFlushedTime(long timePartitionId, String path);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 88569a6329b..7f8b53c5bd8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -541,10 +541,10 @@ public class DataRegionTest {
}
@Test
- public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
+ public void testDisableSeparateDataForInsertRowPlan()
throws WriteProcessException, QueryProcessException,
IllegalPathException, IOException {
- boolean defaultValue = config.isEnableDiscardOutOfOrderData();
- config.setEnableDiscardOutOfOrderData(true);
+ boolean defaultValue = config.isEnableSeparateData();
+ config.setEnableSeparateData(false);
for (int j = 21; j <= 30; j++) {
TSRecord record = new TSRecord(j, deviceId);
@@ -573,8 +573,8 @@ public class DataRegionTest {
deviceId,
context,
null);
- Assert.assertEquals(10, queryDataSource.getSeqResources().size());
- Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(20, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
@@ -582,15 +582,15 @@ public class DataRegionTest {
Assert.assertTrue(resource.isClosed());
}
- config.setEnableDiscardOutOfOrderData(defaultValue);
+ config.setEnableSeparateData(defaultValue);
}
@Test
- public void testEnableDiscardOutOfOrderDataForInsertTablet1()
+ public void testDisableSeparateDataForInsertTablet1()
throws QueryProcessException, IllegalPathException, IOException,
WriteProcessException {
- boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+ boolean defaultEnableDiscard = config.isEnableSeparateData();
long defaultTimePartition = COMMON_CONFIG.getTimePartitionInterval();
- config.setEnableDiscardOutOfOrderData(true);
+ config.setEnableSeparateData(false);
COMMON_CONFIG.setTimePartitionInterval(100000);
String[] measurements = new String[2];
@@ -663,22 +663,22 @@ public class DataRegionTest {
context,
null);
- Assert.assertEquals(2, queryDataSource.getSeqResources().size());
- Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
- config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+ config.setEnableSeparateData(defaultEnableDiscard);
COMMON_CONFIG.setTimePartitionInterval(defaultTimePartition);
}
@Test
- public void testEnableDiscardOutOfOrderDataForInsertTablet2()
+ public void testDisableSeparateDataForInsertTablet2()
throws QueryProcessException, IllegalPathException, IOException,
WriteProcessException {
- boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+ boolean defaultEnableDiscard = config.isEnableSeparateData();
long defaultTimePartition = COMMON_CONFIG.getTimePartitionInterval();
- config.setEnableDiscardOutOfOrderData(true);
+ config.setEnableSeparateData(false);
COMMON_CONFIG.setTimePartitionInterval(1200000);
String[] measurements = new String[2];
@@ -751,22 +751,22 @@ public class DataRegionTest {
context,
null);
- Assert.assertEquals(2, queryDataSource.getSeqResources().size());
- Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
- config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+ config.setEnableSeparateData(defaultEnableDiscard);
COMMON_CONFIG.setTimePartitionInterval(defaultTimePartition);
}
@Test
- public void testEnableDiscardOutOfOrderDataForInsertTablet3()
+ public void testDisableSeparateDataForInsertTablet3()
throws QueryProcessException, IllegalPathException, IOException,
WriteProcessException {
- boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+ boolean defaultEnableDiscard = config.isEnableSeparateData();
long defaultTimePartition = COMMON_CONFIG.getTimePartitionInterval();
- config.setEnableDiscardOutOfOrderData(true);
+ config.setEnableSeparateData(false);
COMMON_CONFIG.setTimePartitionInterval(1000000);
String[] measurements = new String[2];
@@ -839,13 +839,13 @@ public class DataRegionTest {
context,
null);
- Assert.assertEquals(2, queryDataSource.getSeqResources().size());
- Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+ Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}
- config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+ config.setEnableSeparateData(defaultEnableDiscard);
COMMON_CONFIG.setTimePartitionInterval(defaultTimePartition);
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 5d383828ac5..0e0fc96f614 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -467,10 +467,10 @@ timestamp_precision=ms
# Datatype: int
# max_waiting_time_when_insert_blocked=10000
-# Add a switch to drop ouf-of-order data
-# Out-of-order data will impact the aggregation query a lot. Users may not
care about discarding some out-of-order data.
+# Add a switch to enable separate sequence and unsequence data.
+# If it is true, then data will be separated into seq and unseq data dir. If
it is false, then all data will be written into unseq data dir.
# Datatype: boolean
-# enable_discard_out_of_order_data=false
+# enable_separate_data=true
# What will the system do when unrecoverable error occurs.
# Datatype: String