This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 88549541f6b Add a param to switch whether to enable data separation or 
not (#11692)
88549541f6b is described below

commit 88549541f6bf53717169294a2fe2b222d05ddede
Author: 周沛辰 <[email protected]>
AuthorDate: Thu Dec 14 21:45:02 2023 -0600

    Add a param to switch whether to enable data separation or not (#11692)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  14 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +-
 .../db/storageengine/dataregion/DataRegion.java    | 178 +++++++++++----------
 .../dataregion/HashLastFlushTimeMap.java           | 115 ++++---------
 .../dataregion/ILastFlushTimeMap.java              |  31 ++--
 .../storageengine/dataregion/DataRegionTest.java   |  48 +++---
 .../resources/conf/iotdb-common.properties         |   6 +-
 7 files changed, 173 insertions(+), 224 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 69d6b06f60d..e921a374ef6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -839,7 +839,11 @@ public class IoTDBConfig {
   /** the interval to log recover progress of each vsg when starting iotdb */
   private long recoveryLogIntervalInMs = 5_000L;
 
-  private boolean enableDiscardOutOfOrderData = false;
+  /**
+   * Separate sequence and unsequence data or not. If it is false, then all 
data will be written
+   * into unsequence data dir.
+   */
+  private boolean enableSeparateData = true;
 
   /** the method to transform device path to device id, can be 'Plain' or 
'SHA256' */
   private String deviceIDTransformationMethod = "Plain";
@@ -1402,12 +1406,12 @@ public class IoTDBConfig {
     this.rpcPort = rpcPort;
   }
 
-  public boolean isEnableDiscardOutOfOrderData() {
-    return enableDiscardOutOfOrderData;
+  public boolean isEnableSeparateData() {
+    return enableSeparateData;
   }
 
-  public void setEnableDiscardOutOfOrderData(boolean 
enableDiscardOutOfOrderData) {
-    this.enableDiscardOutOfOrderData = enableDiscardOutOfOrderData;
+  public void setEnableSeparateData(boolean enableSeparateData) {
+    this.enableSeparateData = enableSeparateData;
   }
 
   public String getSystemDir() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7949b5768f3..d2f1d9bcfa2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -834,11 +834,10 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "recovery_log_interval_in_ms", 
String.valueOf(conf.getRecoveryLogIntervalInMs()))));
 
-    conf.setEnableDiscardOutOfOrderData(
+    conf.setEnableSeparateData(
         Boolean.parseBoolean(
             properties.getProperty(
-                "enable_discard_out_of_order_data",
-                Boolean.toString(conf.isEnableDiscardOutOfOrderData()))));
+                "enable_separate_data", 
Boolean.toString(conf.isEnableSeparateData()))));
 
     conf.setWindowEvaluationThreadCount(
         Integer.parseInt(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 8b6c1d1289d..60515f8f459 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -145,7 +145,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -223,8 +222,6 @@ public class DataRegion implements IDataRegionForQuery {
   private final CopyOnReadLinkedList<TsFileProcessor> 
closingUnSequenceTsFileProcessor =
       new CopyOnReadLinkedList<>();
 
-  private final AtomicBoolean isSettling = new AtomicBoolean();
-
   /** data region id. */
   private final String dataRegionId;
   /** database name. */
@@ -376,14 +373,6 @@ public class DataRegion implements IDataRegionForQuery {
     return ret;
   }
 
-  public AtomicBoolean getIsSettling() {
-    return isSettling;
-  }
-
-  public void setSettling(boolean isSettling) {
-    this.isSettling.set(isSettling);
-  }
-
   /** this class is used to store recovering context. */
   private class DataRegionRecoveryContext {
     /** number of files to be recovered. */
@@ -547,15 +536,17 @@ public class DataRegion implements IDataRegionForQuery {
               false,
               partitionFiles.getKey() == latestPartitionId);
         }
-        TimePartitionManager.getInstance()
-            .registerTimePartitionInfo(
-                new TimePartitionInfo(
-                    new DataRegionId(Integer.parseInt(dataRegionId)),
-                    latestPartitionId,
-                    false,
-                    Long.MAX_VALUE,
-                    lastFlushTimeMap.getMemSize(latestPartitionId),
-                    true));
+        if (config.isEnableSeparateData()) {
+          TimePartitionManager.getInstance()
+              .registerTimePartitionInfo(
+                  new TimePartitionInfo(
+                      new DataRegionId(Integer.parseInt(dataRegionId)),
+                      latestPartitionId,
+                      false,
+                      Long.MAX_VALUE,
+                      lastFlushTimeMap.getMemSize(latestPartitionId),
+                      true));
+        }
       }
       // wait until all unsealed TsFiles have been recovered
       for (WALRecoverListener recoverListener : recoverListeners) {
@@ -609,8 +600,12 @@ public class DataRegion implements IDataRegionForQuery {
       long endTime = resource.getEndTime(deviceId);
       endTimeMap.put(deviceId.intern(), endTime);
     }
-    lastFlushTimeMap.setMultiDeviceFlushedTime(timePartitionId, endTimeMap);
-    lastFlushTimeMap.setMultiDeviceGlobalFlushedTime(endTimeMap);
+    if (config.isEnableSeparateData()) {
+      lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId, 
endTimeMap);
+    }
+    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+      lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
+    }
   }
 
   public void initCompactionSchedule() {
@@ -829,7 +824,7 @@ public class DataRegion implements IDataRegionForQuery {
     for (TsFileResource tsFileResource : resourceList) {
       recoverSealedTsFiles(tsFileResource, context, isSeq);
     }
-    if (isLatestPartition) {
+    if (isLatestPartition && config.isEnableSeparateData()) {
       lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId);
       for (TsFileResource tsFileResource : resourceList) {
         updateLastFlushTime(tsFileResource, isSeq);
@@ -875,7 +870,8 @@ public class DataRegion implements IDataRegionForQuery {
       // init map
       long timePartitionId = 
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
 
-      if 
(!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+      if (config.isEnableSeparateData()
+          && 
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
         TimePartitionManager.getInstance()
             .registerTimePartitionInfo(
                 new TimePartitionInfo(
@@ -888,15 +884,10 @@ public class DataRegion implements IDataRegionForQuery {
       }
 
       boolean isSequence =
-          insertRowNode.getTime()
-              > lastFlushTimeMap.getFlushedTime(
-                  timePartitionId, 
insertRowNode.getDevicePath().getFullPath());
-
-      // is unsequence and user set config to discard out of order data
-      if (!isSequence
-          && 
IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
-        return;
-      }
+          config.isEnableSeparateData()
+              && insertRowNode.getTime()
+                  > lastFlushTimeMap.getFlushedTime(
+                      timePartitionId, 
insertRowNode.getDevicePath().getFullPath());
 
       // insert to sequence or unSequence file
       insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId);
@@ -962,7 +953,8 @@ public class DataRegion implements IDataRegionForQuery {
           
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]);
       // init map
 
-      if 
(!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
+      if (config.isEnableSeparateData()
+          && 
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
         TimePartitionManager.getInstance()
             .registerTimePartitionInfo(
                 new TimePartitionInfo(
@@ -975,8 +967,10 @@ public class DataRegion implements IDataRegionForQuery {
       }
 
       long lastFlushTime =
-          lastFlushTimeMap.getFlushedTime(
-              beforeTimePartition, 
insertTabletNode.getDevicePath().getFullPath());
+          config.isEnableSeparateData()
+              ? lastFlushTimeMap.getFlushedTime(
+                  beforeTimePartition, 
insertTabletNode.getDevicePath().getFullPath())
+              : Long.MAX_VALUE;
 
       // if is sequence
       boolean isSequence = false;
@@ -986,12 +980,10 @@ public class DataRegion implements IDataRegionForQuery {
         // judge if we should insert sequence
         if (!isSequence && time > lastFlushTime) {
           // insert into unsequence and then start sequence
-          if 
(!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
-            noFailure =
-                insertTabletToTsFileProcessor(
-                        insertTabletNode, before, loc, false, results, 
beforeTimePartition)
-                    && noFailure;
-          }
+          noFailure =
+              insertTabletToTsFileProcessor(
+                      insertTabletNode, before, loc, false, results, 
beforeTimePartition)
+                  && noFailure;
           before = loc;
           isSequence = true;
         }
@@ -999,18 +991,14 @@ public class DataRegion implements IDataRegionForQuery {
       }
 
       // do not forget last part
-      if (before < loc
-          && (isSequence
-              || 
!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
+      if (before < loc) {
         noFailure =
             insertTabletToTsFileProcessor(
                     insertTabletNode, before, loc, isSequence, results, 
beforeTimePartition)
                 && noFailure;
       }
-      long globalLatestFlushedTime =
-          
lastFlushTimeMap.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
       startTime = System.nanoTime();
-      tryToUpdateBatchInsertLastCache(insertTabletNode, 
globalLatestFlushedTime);
+      tryToUpdateBatchInsertLastCache(insertTabletNode);
       
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
 
       if (!noFailure) {
@@ -1083,13 +1071,15 @@ public class DataRegion implements IDataRegionForQuery {
     return true;
   }
 
-  private void tryToUpdateBatchInsertLastCache(InsertTabletNode node, long 
latestFlushedTime) {
+  private void tryToUpdateBatchInsertLastCache(InsertTabletNode node) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
         || 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
             && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
       // disable updating last cache on follower
       return;
     }
+    long latestFlushedTime =
+        
lastFlushTimeMap.getGlobalFlushedTime(node.getDevicePath().getFullPath());
     String[] measurements = node.getMeasurements();
     MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
     String[] rawMeasurements = new String[measurements.length];
@@ -1126,12 +1116,8 @@ public class DataRegion implements IDataRegionForQuery {
     }
 
     tsFileProcessor.insert(insertRowNode);
-
-    long globalLatestFlushTime =
-        
lastFlushTimeMap.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
-
     long startTime = System.nanoTime();
-    tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
+    tryToUpdateInsertLastCache(insertRowNode);
     
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
 
     // check memtable size and may asyncTryToFlush the work memtable
@@ -1140,13 +1126,15 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  private void tryToUpdateInsertLastCache(InsertRowNode node, long 
latestFlushedTime) {
+  private void tryToUpdateInsertLastCache(InsertRowNode node) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
         || 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
             && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
       // disable updating last cache on follower
       return;
     }
+    long latestFlushedTime =
+        
lastFlushTimeMap.getGlobalFlushedTime(node.getDevicePath().getFullPath());
     String[] measurements = node.getMeasurements();
     MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
     String[] rawMeasurements = new String[measurements.length];
@@ -2234,25 +2222,44 @@ public class DataRegion implements IDataRegionForQuery {
 
   private void unsequenceFlushCallback(
       TsFileProcessor processor, Map<String, Long> updateMap, long 
systemFlushTime) {
-    TimePartitionManager.getInstance()
-        .updateAfterFlushing(
-            new DataRegionId(Integer.valueOf(dataRegionId)),
-            processor.getTimeRangeId(),
-            systemFlushTime,
-            lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
-            workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != 
null);
+    if (!config.isEnableSeparateData()
+        && CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+      // update globalLastFlushTime if and only if isEnableSeparateData is 
false and
+      // isLastCacheEnable is true
+      lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(updateMap);
+    }
+    if (config.isEnableSeparateData()) {
+      TimePartitionManager.getInstance()
+          .updateAfterFlushing(
+              new DataRegionId(Integer.valueOf(dataRegionId)),
+              processor.getTimeRangeId(),
+              systemFlushTime,
+              lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
+              workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != 
null);
+    }
   }
 
   private void sequenceFlushCallback(
       TsFileProcessor processor, Map<String, Long> updateMap, long 
systemFlushTime) {
-    lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), 
updateMap);
-    TimePartitionManager.getInstance()
-        .updateAfterFlushing(
-            new DataRegionId(Integer.valueOf(dataRegionId)),
-            processor.getTimeRangeId(),
-            systemFlushTime,
-            lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
-            workUnsequenceTsFileProcessors.get(processor.getTimeRangeId()) != 
null);
+    if (config.isEnableSeparateData()
+        && CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+      // update both partitionLastFlushTime and globalLastFlushTime
+      lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), 
updateMap);
+    } else {
+      // isEnableSeparateData is true and isLastCacheEnable is false, then 
update
+      // partitionLastFlushTime only
+      
lastFlushTimeMap.updateMultiDeviceFlushedTime(processor.getTimeRangeId(), 
updateMap);
+    }
+
+    if (config.isEnableSeparateData()) {
+      TimePartitionManager.getInstance()
+          .updateAfterFlushing(
+              new DataRegionId(Integer.valueOf(dataRegionId)),
+              processor.getTimeRangeId(),
+              systemFlushTime,
+              lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
+              workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != 
null);
+    }
   }
 
   /** put the memtable back to the MemTablePool and make the metadata in 
writer visible */
@@ -2622,8 +2629,12 @@ public class DataRegion implements IDataRegionForQuery {
     for (String device : newTsFileResource.getDevices()) {
       long endTime = newTsFileResource.getEndTime(device);
       long timePartitionId = TimePartitionUtils.getTimePartitionId(endTime);
-      lastFlushTimeMap.updateFlushedTime(timePartitionId, device, endTime);
-      lastFlushTimeMap.updateGlobalFlushedTime(device, endTime);
+      if (config.isEnableSeparateData()) {
+        lastFlushTimeMap.updateOneDeviceFlushedTime(timePartitionId, device, 
endTime);
+      }
+      if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+        lastFlushTimeMap.updateOneDeviceGlobalFlushedTime(device, endTime);
+      }
     }
   }
 
@@ -2954,7 +2965,6 @@ public class DataRegion implements IDataRegionForQuery {
       if (deleted) {
         return;
       }
-      boolean isSequence = false;
       for (int i = 0; i < 
insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
         InsertRowNode insertRowNode = 
insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
         if (!isAlive(insertRowNode.getTime())) {
@@ -2976,7 +2986,8 @@ public class DataRegion implements IDataRegionForQuery {
         // init map
         long timePartitionId = 
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
 
-        if 
(!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+        if (config.isEnableSeparateData()
+            && 
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
           TimePartitionManager.getInstance()
               .registerTimePartitionInfo(
                   new TimePartitionInfo(
@@ -2988,20 +2999,11 @@ public class DataRegion implements IDataRegionForQuery {
                       tsFileManager.isLatestTimePartition(timePartitionId)));
         }
 
-        // as the plans have been ordered, and we have get the write lock,
-        // So, if a plan is sequenced, then all the rest plans are sequenced.
-        //
-        if (!isSequence) {
-          isSequence =
-              insertRowNode.getTime()
-                  > lastFlushTimeMap.getFlushedTime(
-                      timePartitionId, 
insertRowNode.getDevicePath().getFullPath());
-        }
-        // is unsequence and user set config to discard out of order data
-        if (!isSequence
-            && 
IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
-          return;
-        }
+        boolean isSequence =
+            config.isEnableSeparateData()
+                && insertRowNode.getTime()
+                    > lastFlushTimeMap.getFlushedTime(
+                        timePartitionId, 
insertRowNode.getDevicePath().getFullPath());
 
         // insert to sequence or unSequence file
         try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 7fce13405fd..d8215aff763 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -54,81 +54,38 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
    * data point should be put into a sequential file or an unsequential file. 
Data of some device
    * with timestamp less than or equals to the device's latestFlushedTime 
should go into an
    * unsequential file.
+   *
+   * <p>It is used to separate sequence and unsequence data.
    */
-  private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice 
= new HashMap<>();
-  /** used to record the latest flush time while upgrading and inserting */
-  private Map<Long, Map<String, Long>> 
newlyFlushedPartitionLatestFlushedTimeForEachDevice =
+  private final Map<Long, Map<String, Long>> 
partitionLatestFlushedTimeForEachDevice =
       new HashMap<>();
+
   /**
    * global mapping of device -> largest timestamp of the latest memtable to * 
be submitted to
    * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to 
maintain global
    * latestFlushedTime of devices and will be updated along with
    * partitionLatestFlushedTimeForEachDevice
+   *
+   * <p>It is used to update last cache.
    */
-  private Map<String, Long> globalLatestFlushedTimeForEachDevice = new 
HashMap<>();
+  private final Map<String, Long> globalLatestFlushedTimeForEachDevice = new 
HashMap<>();
 
   /** used for recovering flush time from tsfile resource */
   TsFileManager tsFileManager;
 
   /** record memory cost of map for each partitionId */
-  private Map<Long, Long> memCostForEachPartition = new HashMap<>();
+  private final Map<Long, Long> memCostForEachPartition = new HashMap<>();
 
   public HashLastFlushTimeMap(TsFileManager tsFileManager) {
     this.tsFileManager = tsFileManager;
   }
 
   @Override
-  public void setMultiDeviceFlushedTime(long timePartitionId, Map<String, 
Long> flushedTimeMap) {
-    Map<String, Long> flushTimeMapForPartition =
-        partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
-    if (flushTimeMapForPartition == null) {
-      return;
-    }
-    long memIncr = 0;
-    for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
-      if (!flushTimeMapForPartition.containsKey(entry.getKey())) {
-        memIncr += HASHMAP_NODE_BASIC_SIZE + 2L * entry.getKey().length();
-      }
-      flushTimeMapForPartition.merge(entry.getKey(), entry.getValue(), 
Math::max);
-    }
-    long finalMemIncr = memIncr;
-    memCostForEachPartition.compute(
-        timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + 
finalMemIncr);
-  }
-
-  @Override
-  public void setOneDeviceFlushedTime(long timePartitionId, String path, long 
time) {
+  public void updateOneDeviceFlushedTime(long timePartitionId, String path, 
long time) {
     Map<String, Long> flushTimeMapForPartition =
-        partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
-    if (flushTimeMapForPartition == null) {
-      return;
-    }
-    if (flushTimeMapForPartition.put(path, time) == null) {
-      long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * path.length();
-      memCostForEachPartition.compute(
-          timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
-    }
-  }
-
-  @Override
-  public void setMultiDeviceGlobalFlushedTime(Map<String, Long> 
globalFlushedTimeMap) {
-    for (Map.Entry<String, Long> entry : globalFlushedTimeMap.entrySet()) {
-      globalLatestFlushedTimeForEachDevice.merge(entry.getKey(), 
entry.getValue(), Math::max);
-    }
-  }
-
-  @Override
-  public void setOneDeviceGlobalFlushedTime(String path, long time) {
-    globalLatestFlushedTimeForEachDevice.put(path, time);
-  }
+        partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
+            timePartitionId, id -> new HashMap<>());
 
-  @Override
-  public void updateFlushedTime(long timePartitionId, String path, long time) {
-    Map<String, Long> flushTimeMapForPartition =
-        partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
-    if (flushTimeMapForPartition == null) {
-      return;
-    }
     flushTimeMapForPartition.compute(
         path,
         (k, v) -> {
@@ -146,17 +103,34 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
   }
 
   @Override
-  public void updateGlobalFlushedTime(String path, long time) {
+  public void updateMultiDeviceFlushedTime(long timePartitionId, Map<String, 
Long> flushedTimeMap) {
+    Map<String, Long> flushTimeMapForPartition =
+        partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
+            timePartitionId, id -> new HashMap<>());
+
+    long memIncr = 0;
+    for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
+      if (!flushTimeMapForPartition.containsKey(entry.getKey())) {
+        memIncr += HASHMAP_NODE_BASIC_SIZE + 2L * entry.getKey().length();
+      }
+      flushTimeMapForPartition.merge(entry.getKey(), entry.getValue(), 
Math::max);
+    }
+    long finalMemIncr = memIncr;
+    memCostForEachPartition.compute(
+        timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + 
finalMemIncr);
+  }
+
+  @Override
+  public void updateOneDeviceGlobalFlushedTime(String path, long time) {
     globalLatestFlushedTimeForEachDevice.compute(
         path, (k, v) -> v == null ? time : Math.max(v, time));
   }
 
   @Override
-  public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
-      long partitionId, String deviceId, long time) {
-    newlyFlushedPartitionLatestFlushedTimeForEachDevice
-        .computeIfAbsent(partitionId, id -> new HashMap<>())
-        .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
+  public void updateMultiDeviceGlobalFlushedTime(Map<String, Long> 
globalFlushedTimeMap) {
+    for (Map.Entry<String, Long> entry : globalFlushedTimeMap.entrySet()) {
+      globalLatestFlushedTimeForEachDevice.merge(entry.getKey(), 
entry.getValue(), Math::max);
+    }
   }
 
   @Override
@@ -168,33 +142,12 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
     return true;
   }
 
-  @Override
-  public void applyNewlyFlushedTimeToFlushedTime() {
-    for (Map.Entry<Long, Map<String, Long>> entry :
-        newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet()) {
-      long timePartitionId = entry.getKey();
-      Map<String, Long> latestFlushTimeForPartition =
-          
partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new 
HashMap<>());
-      for (Map.Entry<String, Long> endTimeMap : entry.getValue().entrySet()) {
-        String device = endTimeMap.getKey();
-        long endTime = endTimeMap.getValue();
-        if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < 
endTime) {
-          partitionLatestFlushedTimeForEachDevice
-              .computeIfAbsent(timePartitionId, id -> new HashMap<>())
-              .put(device, endTime);
-        }
-      }
-    }
-  }
-
   @Override
   public void updateLatestFlushTime(long partitionId, Map<String, Long> 
updateMap) {
     for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(partitionId, id -> new HashMap<>())
           .merge(entry.getKey(), entry.getValue(), Math::max);
-      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
-          partitionId, entry.getKey(), entry.getValue());
       if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), 
Long.MIN_VALUE)
           < entry.getValue()) {
         globalLatestFlushedTimeForEachDevice.put(entry.getKey(), 
entry.getValue());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 023d5e3c0c8..97f114e04a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -24,24 +24,21 @@ import java.util.Map;
 /** This interface manages last time and flush time for sequence and 
unsequence determination */
 public interface ILastFlushTimeMap {
 
-  // region set
-  void setMultiDeviceFlushedTime(long timePartitionId, Map<String, Long> 
flushedTimeMap);
-
-  void setOneDeviceFlushedTime(long timePartitionId, String path, long time);
-
-  void setMultiDeviceGlobalFlushedTime(Map<String, Long> globalFlushedTimeMap);
-
-  void setOneDeviceGlobalFlushedTime(String path, long time);
-  // endregion
-
   // region update
+  /** Update partitionLatestFlushedTimeForEachDevice. */
+  void updateOneDeviceFlushedTime(long timePartitionId, String path, long 
time);
+
+  void updateMultiDeviceFlushedTime(long timePartitionId, Map<String, Long> 
flushedTimeMap);
 
-  void updateFlushedTime(long timePartitionId, String path, long time);
+  /** Update globalLatestFlushedTimeForEachDevice. */
+  void updateOneDeviceGlobalFlushedTime(String path, long time);
 
-  void updateGlobalFlushedTime(String path, long time);
+  void updateMultiDeviceGlobalFlushedTime(Map<String, Long> 
globalFlushedTimeMap);
 
-  void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
-      long partitionId, String deviceId, long time);
+  /**
+   * Update both partitionLatestFlushedTimeForEachDevice and 
globalLatestFlushedTimeForEachDevice.
+   */
+  void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
   // endregion
 
   // region ensure
@@ -49,12 +46,6 @@ public interface ILastFlushTimeMap {
 
   // endregion
 
-  // region support upgrade methods
-  void applyNewlyFlushedTimeToFlushedTime();
-
-  void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
-  // endregion
-
   // region read
   long getFlushedTime(long timePartitionId, String path);
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 88569a6329b..7f8b53c5bd8 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -541,10 +541,10 @@ public class DataRegionTest {
   }
 
   @Test
-  public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
+  public void testDisableSeparateDataForInsertRowPlan()
       throws WriteProcessException, QueryProcessException, 
IllegalPathException, IOException {
-    boolean defaultValue = config.isEnableDiscardOutOfOrderData();
-    config.setEnableDiscardOutOfOrderData(true);
+    boolean defaultValue = config.isEnableSeparateData();
+    config.setEnableSeparateData(false);
 
     for (int j = 21; j <= 30; j++) {
       TSRecord record = new TSRecord(j, deviceId);
@@ -573,8 +573,8 @@ public class DataRegionTest {
             deviceId,
             context,
             null);
-    Assert.assertEquals(10, queryDataSource.getSeqResources().size());
-    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(20, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
       Assert.assertTrue(resource.isClosed());
     }
@@ -582,15 +582,15 @@ public class DataRegionTest {
       Assert.assertTrue(resource.isClosed());
     }
 
-    config.setEnableDiscardOutOfOrderData(defaultValue);
+    config.setEnableSeparateData(defaultValue);
   }
 
   @Test
-  public void testEnableDiscardOutOfOrderDataForInsertTablet1()
+  public void testDisableSeparateDataForInsertTablet1()
       throws QueryProcessException, IllegalPathException, IOException, 
WriteProcessException {
-    boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+    boolean defaultEnableDiscard = config.isEnableSeparateData();
     long defaultTimePartition = COMMON_CONFIG.getTimePartitionInterval();
-    config.setEnableDiscardOutOfOrderData(true);
+    config.setEnableSeparateData(false);
     COMMON_CONFIG.setTimePartitionInterval(100000);
 
     String[] measurements = new String[2];
@@ -663,22 +663,22 @@ public class DataRegionTest {
             context,
             null);
 
-    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
-    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
       Assert.assertTrue(resource.isClosed());
     }
 
-    config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+    config.setEnableSeparateData(defaultEnableDiscard);
     COMMON_CONFIG.setTimePartitionInterval(defaultTimePartition);
   }
 
   @Test
-  public void testEnableDiscardOutOfOrderDataForInsertTablet2()
+  public void testDisableSeparateDataForInsertTablet2()
       throws QueryProcessException, IllegalPathException, IOException, 
WriteProcessException {
-    boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+    boolean defaultEnableDiscard = config.isEnableSeparateData();
     long defaultTimePartition = COMMON_CONFIG.getTimePartitionInterval();
-    config.setEnableDiscardOutOfOrderData(true);
+    config.setEnableSeparateData(false);
     COMMON_CONFIG.setTimePartitionInterval(1200000);
 
     String[] measurements = new String[2];
@@ -751,22 +751,22 @@ public class DataRegionTest {
             context,
             null);
 
-    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
-    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
       Assert.assertTrue(resource.isClosed());
     }
 
-    config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+    config.setEnableSeparateData(defaultEnableDiscard);
     COMMON_CONFIG.setTimePartitionInterval(defaultTimePartition);
   }
 
   @Test
-  public void testEnableDiscardOutOfOrderDataForInsertTablet3()
+  public void testDisableSeparateDataForInsertTablet3()
       throws QueryProcessException, IllegalPathException, IOException, 
WriteProcessException {
-    boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
+    boolean defaultEnableDiscard = config.isEnableSeparateData();
     long defaultTimePartition = COMMON_CONFIG.getTimePartitionInterval();
-    config.setEnableDiscardOutOfOrderData(true);
+    config.setEnableSeparateData(false);
     COMMON_CONFIG.setTimePartitionInterval(1000000);
 
     String[] measurements = new String[2];
@@ -839,13 +839,13 @@ public class DataRegionTest {
             context,
             null);
 
-    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
-    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    Assert.assertEquals(0, queryDataSource.getSeqResources().size());
+    Assert.assertEquals(2, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
       Assert.assertTrue(resource.isClosed());
     }
 
-    config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
+    config.setEnableSeparateData(defaultEnableDiscard);
     COMMON_CONFIG.setTimePartitionInterval(defaultTimePartition);
   }
 
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 5d383828ac5..0e0fc96f614 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -467,10 +467,10 @@ timestamp_precision=ms
 # Datatype: int
 # max_waiting_time_when_insert_blocked=10000
 
-# Add a switch to drop ouf-of-order data
-# Out-of-order data will impact the aggregation query a lot. Users may not 
care about discarding some out-of-order data.
+# Add a switch to enable separate sequence and unsequence data.
+# If it is true, then data will be separated into seq and unseq data dir. If 
it is false, then all data will be written into unseq data dir.
 # Datatype: boolean
-# enable_discard_out_of_order_data=false
+# enable_separate_data=true
 
 # What will the system do when unrecoverable error occurs.
 # Datatype: String


Reply via email to