jt2594838 commented on code in PR #12122:
URL: https://github.com/apache/iotdb/pull/12122#discussion_r1574102114


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java:
##########
@@ -283,30 +227,87 @@ protected boolean doCompaction() {
             String.format("%.2f", costTime),
             String.format("%.2f", selectedFileSize / 1024.0d / 1024.0d / 
costTime),
             summary);
-      } finally {
-        Files.deleteIfExists(logFile.toPath());
-        // may failed to set status if the status of target resource is DELETED
-        targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL);
       }
     } catch (Exception e) {
       isSuccess = false;
       handleException(LOGGER, e);
       recover();
     } finally {
       releaseAllLocks();
+      try {
+        if (logFile != null) {
+          Files.deleteIfExists(logFile.toPath());
+        }
+      } catch (IOException e) {
+        handleException(LOGGER, e);
+      }
+      // may fail to set status if the status of target resource is DELETED
+      targetTsFileResource.setStatus(TsFileResourceStatus.NORMAL);
     }
     return isSuccess;
   }
 
+  protected void compact(SimpleCompactionLogger compactionLogger) throws 
Exception {
+    // carry out the compaction
+    targetTsFileList = new 
ArrayList<>(Collections.singletonList(targetTsFileResource));
+    performer.setSourceFiles(selectedTsFileResourceList);
+    // As elements in targetFiles may be removed in performer, we should use a 
mutable list
+    // instead of Collections.singletonList()
+    performer.setTargetFiles(targetTsFileList);
+    performer.setSummary(summary);
+    performer.perform();
+
+    prepareTargetFiles();
+
+    if (Thread.currentThread().isInterrupted() || summary.isCancel()) {
+      throw new InterruptedException(
+          String.format("%s-%s [Compaction] abort", storageGroupName, 
dataRegionId));
+    }
+
+    validateCompactionResult(
+        sequence ? selectedTsFileResourceList : Collections.emptyList(),
+        sequence ? Collections.emptyList() : selectedTsFileResourceList,
+        targetTsFileList);
+
+    // replace the old files with new file, the new is in same position as the 
old
+    tsFileManager.replace(
+        sequence ? selectedTsFileResourceList : Collections.emptyList(),
+        sequence ? Collections.emptyList() : selectedTsFileResourceList,
+        targetTsFileList,
+        timePartition);
+
+    // release the read lock of all source files, and get the write lock of 
them to delete them
+    for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+      selectedTsFileResourceList.get(i).writeLock();
+      isHoldingWriteLock[i] = true;
+    }

Review Comment:
   Where is “release the read lock of all source files”?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.TTLException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan;
+import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
+import org.apache.iotdb.confignode.consensus.response.ttl.ShowTTLResp;
+import org.apache.iotdb.confignode.persistence.TTLInfo;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
+public class TTLManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TTLManager.class);
+  private final IManager configManager;
+
+  private final TTLInfo ttlInfo;
+
+  private static final int ttlCountThreshold =
+      CommonDescriptor.getInstance().getConfig().getTTLCountThreshold();
+
+  public TTLManager(IManager configManager, TTLInfo ttlInfo) {
+    this.configManager = configManager;
+    this.ttlInfo = ttlInfo;
+  }
+
+  public TSStatus setTTL(SetTTLPlan setTTLPlan) {
+    PartialPath path = new PartialPath(setTTLPlan.getPathPattern());
+    if (!checkIsPathValidated(path)) {
+      TSStatus errorStatus = new 
TSStatus(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
+      errorStatus.setMessage(new 
TTLException(path.getFullPath()).getMessage());

Review Comment:
   Creating an Exception is costly, it should be made use of.
   Provide a static method in TTLException to generate the message or throw the 
exception.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
+
+import org.apache.iotdb.db.service.metrics.FileMetrics;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This settle task contains all_deleted files and partial_deleted files. The 
partial_deleted files
+ * are divided into several groups, each group may contain one or several 
files. This task will do
+ * the following two things respectively: 1. Settle all all_deleted files by 
deleting them directly.
+ * 2. Settle partial_deleted files: put the files of each partial_deleted 
group into an invisible
+ * innerCompactionTask, and then perform the cleanup work. The source files in 
a file group will be
+ * compacted into a target file.
+ */
+public class SettleCompactionTask extends InnerSpaceCompactionTask {
+  private List<TsFileResource> allDeletedFiles;
+  private double allDeletedFileSize = 0;
+  private double partialDeletedFileSize = 0;
+
+  private int allDeletedSuccessNum = 0;
+
+  private long totalModsFileSize;
+
+  public SettleCompactionTask(
+      long timePartition,
+      TsFileManager tsFileManager,
+      List<TsFileResource> allDeletedFiles,
+      List<TsFileResource> partialDeletedFiles,

Review Comment:
   I would suggest the names be `fullyDeletedFiles` and 
`partiallyDeletedFiles`, which is less confusing.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java:
##########
@@ -98,6 +100,8 @@ public void transformToTsBlockColumns(
         .getColumnBuilder(0)
         .writeBinary(new Binary(device.getFullPath(), 
TSFileConfig.STRING_CHARSET));
     int templateId = device.getTemplateId();
+    long ttl = DataNodeTTLCache.getInstance().getTTL(device.getFullPath());
+    String ttlStr = ttl == Long.MAX_VALUE ? IoTDBConstant.TTL_INFINITE : 
String.valueOf(ttl);

Review Comment:
   Now that you present the TTL as text, maybe you can make it more readable, 
like "30 days" or "10 hours", with an option or something.
   Just leave a mark, not a requested change.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java:
##########
@@ -578,7 +583,27 @@ public String nextSeries() {
           List<Modification> modificationsInThisResource =
               modificationCache.computeIfAbsent(
                   resource,
-                  r -> new 
LinkedList<>(ModificationFile.getNormalMods(r).getModifications()));
+                  r -> {
+                    List<Modification> list =
+                        new 
LinkedList<>(ModificationFile.getNormalMods(r).getModifications());
+                    // add outdated device mods by ttl
+                    for (IDeviceID device : r.getDevices()) {
+                      long timeLowerBound =
+                          CommonDateTimeUtils.currentTime()
+                              - DataNodeTTLCache.getInstance()
+                                  .getTTL(((PlainDeviceID) 
device).toStringID());

Review Comment:
   Add a `TODO: remove deviceId convertion` here.



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties:
##########
@@ -450,6 +450,25 @@ data_replication_factor=1
 # Unit: ms
 # default_ttl_in_ms=-1
 
+# The threshold for the number of TTL stored in the system, the default is 
1000.

Review Comment:
   "Threshold" is not very precise. It means something will happen beyond the 
threshold, but what will happen is not clear. Simply using "maximum" is more 
clear and it indicates the number of TTL cannot go beyond this value.
   Also, "the number of TTL" sounds odd. It should be "the number of TTL rules 
(or settings)". 



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ISettleSelector;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
+import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl.DirtyStatus.PARTIAL_DELETED;
+
+public class SettleSelectorImpl implements ISettleSelector {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private final boolean heavySelect;
+  private final String storageGroupName;
+  private final String dataRegionId;
+  private final long timePartition;
+  private final TsFileManager tsFileManager;
+  private boolean isSeq;
+
+  public SettleSelectorImpl(
+      boolean heavySelect,
+      String storageGroupName,
+      String dataRegionId,
+      long timePartition,
+      TsFileManager tsFileManager) {
+    this.heavySelect = heavySelect;
+    this.storageGroupName = storageGroupName;
+    this.dataRegionId = dataRegionId;
+    this.timePartition = timePartition;
+    this.tsFileManager = tsFileManager;
+  }
+
+  static class AllDirtyResource {
+    List<TsFileResource> resources = new ArrayList<>();
+
+    public void add(TsFileResource resource) {
+      resources.add(resource);
+    }
+
+    public List<TsFileResource> getResources() {
+      return resources;
+    }
+  }
+
+  static class PartialDirtyResource {
+    List<TsFileResource> resources = new ArrayList<>();
+    long totalFileSize = 0;
+
+    public boolean add(TsFileResource resource, long dirtyDataSize) {
+      resources.add(resource);
+      totalFileSize += resource.getTsFileSize();
+      totalFileSize -= dirtyDataSize;
+      return checkHasReachedThreshold();
+    }
+
+    public List<TsFileResource> getResources() {
+      return resources;
+    }
+
+    public boolean checkHasReachedThreshold() {
+      return resources.size() >= config.getFileLimitPerInnerTask()
+          || totalFileSize >= config.getTargetCompactionFileSize();
+    }
+  }
+
+  @Override
+  public List<SettleCompactionTask> selectSettleTask(List<TsFileResource> 
tsFileResources) {
+    if (tsFileResources.isEmpty()) {
+      return Collections.emptyList();
+    }
+    this.isSeq = tsFileResources.get(0).isSeq();
+    return selectTasks(tsFileResources);
+  }
+
+  private List<SettleCompactionTask> selectTasks(List<TsFileResource> 
resources) {
+    AllDirtyResource allDirtyResource = new AllDirtyResource();
+    List<PartialDirtyResource> partialDirtyResourceList = new ArrayList<>();
+    PartialDirtyResource partialDirtyResource = new PartialDirtyResource();
+    try {
+      for (TsFileResource resource : resources) {
+        if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+          continue;
+        }
+        boolean shouldStop = false;
+        DirtyStatus dirtyStatus;
+        if (!heavySelect) {
+          dirtyStatus = selectFileBaseOnModSize(resource);
+        } else {
+          dirtyStatus = selectFileBaseOnDirtyData(resource);
+        }
+
+        switch (dirtyStatus) {
+          case ALL_DELETED:
+            allDirtyResource.add(resource);
+            break;
+          case PARTIAL_DELETED:
+            shouldStop = partialDirtyResource.add(resource, 
dirtyStatus.getDirtyDataSize());
+            break;
+          case NOT_SATISFIED:
+            shouldStop = !partialDirtyResource.getResources().isEmpty();
+            break;
+          default:
+            // do nothing
+        }
+
+        if (shouldStop) {
+          partialDirtyResourceList.add(partialDirtyResource);
+          partialDirtyResource = new PartialDirtyResource();
+          if (!heavySelect) {
+            // Non-heavy selection is triggered more frequently. In order to 
avoid selecting too
+            // many files containing mods for compaction when the disk is 
insufficient, the number
+            // and size of files are limited here.
+            break;
+          }
+        }
+      }
+      partialDirtyResourceList.add(partialDirtyResource);
+      return creatTask(allDirtyResource, partialDirtyResourceList);
+    } catch (Exception e) {
+      LOGGER.error(
+          "{}-{} cannot select file for settle compaction", storageGroupName, 
dataRegionId, e);
+    }
+    return Collections.emptyList();
+  }
+
+  private DirtyStatus selectFileBaseOnModSize(TsFileResource resource) {
+    ModificationFile modFile = resource.getModFile();
+    if (modFile == null || !modFile.exists()) {
+      return DirtyStatus.NOT_SATISFIED;
+    }
+    return modFile.getSize() > 
config.getInnerCompactionTaskSelectionModsFileThreshold()
+            || (!heavySelect
+                && !CompactionUtils.isDiskHasSpace(
+                    config.getInnerCompactionTaskSelectionDiskRedundancy()))
+        ? PARTIAL_DELETED
+        : DirtyStatus.NOT_SATISFIED;
+  }
+
+  /**
+   * Only when all devices with ttl are deleted may they be selected. On the 
basic of the previous,
+   * only when the number of deleted devices exceeds the threshold or has 
expired for too long will
+   * they be selected.
+   *
+   * @return dirty status means the status of current resource.
+   */
+  private DirtyStatus selectFileBaseOnDirtyData(TsFileResource resource)
+      throws IOException, IllegalPathException {
+    ModificationFile modFile = resource.getModFile();
+    DeviceTimeIndex deviceTimeIndex =
+        resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE
+            ? resource.buildDeviceTimeIndex()
+            : (DeviceTimeIndex) resource.getTimeIndex();
+    Set<IDeviceID> deletedDevices = new HashSet<>();
+    boolean hasExpiredTooLong = false;
+    long currentTime = CommonDateTimeUtils.currentTime();
+
+    Collection<Modification> modifications = modFile.getModifications();
+    for (IDeviceID device : deviceTimeIndex.getDevices()) {
+      // check expired device by ttl
+      long deviceTTL = DataNodeTTLCache.getInstance().getTTL(((PlainDeviceID) 
device).toStringID());

Review Comment:
   Add `TODO: remove deviceId conversion` here.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ISettleSelector;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
+import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl.DirtyStatus.PARTIAL_DELETED;
+
+public class SettleSelectorImpl implements ISettleSelector {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private final boolean heavySelect;
+  private final String storageGroupName;
+  private final String dataRegionId;
+  private final long timePartition;
+  private final TsFileManager tsFileManager;
+  private boolean isSeq;
+
+  public SettleSelectorImpl(
+      boolean heavySelect,
+      String storageGroupName,
+      String dataRegionId,
+      long timePartition,
+      TsFileManager tsFileManager) {
+    this.heavySelect = heavySelect;
+    this.storageGroupName = storageGroupName;
+    this.dataRegionId = dataRegionId;
+    this.timePartition = timePartition;
+    this.tsFileManager = tsFileManager;
+  }
+
+  static class AllDirtyResource {
+    List<TsFileResource> resources = new ArrayList<>();
+
+    public void add(TsFileResource resource) {
+      resources.add(resource);
+    }
+
+    public List<TsFileResource> getResources() {
+      return resources;
+    }
+  }
+
+  static class PartialDirtyResource {
+    List<TsFileResource> resources = new ArrayList<>();
+    long totalFileSize = 0;
+
+    public boolean add(TsFileResource resource, long dirtyDataSize) {
+      resources.add(resource);
+      totalFileSize += resource.getTsFileSize();
+      totalFileSize -= dirtyDataSize;
+      return checkHasReachedThreshold();
+    }
+
+    public List<TsFileResource> getResources() {
+      return resources;
+    }
+
+    public boolean checkHasReachedThreshold() {
+      return resources.size() >= config.getFileLimitPerInnerTask()
+          || totalFileSize >= config.getTargetCompactionFileSize();
+    }
+  }
+
+  @Override
+  public List<SettleCompactionTask> selectSettleTask(List<TsFileResource> 
tsFileResources) {
+    if (tsFileResources.isEmpty()) {
+      return Collections.emptyList();
+    }
+    this.isSeq = tsFileResources.get(0).isSeq();
+    return selectTasks(tsFileResources);
+  }
+
+  private List<SettleCompactionTask> selectTasks(List<TsFileResource> 
resources) {
+    AllDirtyResource allDirtyResource = new AllDirtyResource();
+    List<PartialDirtyResource> partialDirtyResourceList = new ArrayList<>();
+    PartialDirtyResource partialDirtyResource = new PartialDirtyResource();
+    try {
+      for (TsFileResource resource : resources) {
+        if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+          continue;
+        }
+        boolean shouldStop = false;
+        DirtyStatus dirtyStatus;
+        if (!heavySelect) {
+          dirtyStatus = selectFileBaseOnModSize(resource);
+        } else {
+          dirtyStatus = selectFileBaseOnDirtyData(resource);
+        }
+
+        switch (dirtyStatus) {
+          case ALL_DELETED:
+            allDirtyResource.add(resource);
+            break;
+          case PARTIAL_DELETED:
+            shouldStop = partialDirtyResource.add(resource, 
dirtyStatus.getDirtyDataSize());
+            break;
+          case NOT_SATISFIED:
+            shouldStop = !partialDirtyResource.getResources().isEmpty();
+            break;
+          default:
+            // do nothing
+        }
+
+        if (shouldStop) {
+          partialDirtyResourceList.add(partialDirtyResource);
+          partialDirtyResource = new PartialDirtyResource();
+          if (!heavySelect) {
+            // Non-heavy selection is triggered more frequently. In order to 
avoid selecting too
+            // many files containing mods for compaction when the disk is 
insufficient, the number
+            // and size of files are limited here.
+            break;
+          }
+        }
+      }
+      partialDirtyResourceList.add(partialDirtyResource);
+      return creatTask(allDirtyResource, partialDirtyResourceList);
+    } catch (Exception e) {
+      LOGGER.error(
+          "{}-{} cannot select file for settle compaction", storageGroupName, 
dataRegionId, e);
+    }
+    return Collections.emptyList();
+  }
+
+  private DirtyStatus selectFileBaseOnModSize(TsFileResource resource) {
+    ModificationFile modFile = resource.getModFile();
+    if (modFile == null || !modFile.exists()) {
+      return DirtyStatus.NOT_SATISFIED;
+    }
+    return modFile.getSize() > 
config.getInnerCompactionTaskSelectionModsFileThreshold()
+            || (!heavySelect
+                && !CompactionUtils.isDiskHasSpace(
+                    config.getInnerCompactionTaskSelectionDiskRedundancy()))
+        ? PARTIAL_DELETED
+        : DirtyStatus.NOT_SATISFIED;
+  }
+
+  /**
+   * Only when all devices with ttl are deleted may they be selected. On the 
basic of the previous,
+   * only when the number of deleted devices exceeds the threshold or has 
expired for too long will
+   * they be selected.
+   *
+   * @return dirty status means the status of current resource.
+   */
+  private DirtyStatus selectFileBaseOnDirtyData(TsFileResource resource)
+      throws IOException, IllegalPathException {
+    ModificationFile modFile = resource.getModFile();
+    DeviceTimeIndex deviceTimeIndex =
+        resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE
+            ? resource.buildDeviceTimeIndex()
+            : (DeviceTimeIndex) resource.getTimeIndex();
+    Set<IDeviceID> deletedDevices = new HashSet<>();
+    boolean hasExpiredTooLong = false;
+    long currentTime = CommonDateTimeUtils.currentTime();
+
+    Collection<Modification> modifications = modFile.getModifications();
+    for (IDeviceID device : deviceTimeIndex.getDevices()) {
+      // check expired device by ttl
+      long deviceTTL = DataNodeTTLCache.getInstance().getTTL(((PlainDeviceID) 
device).toStringID());
+      boolean hasSetTTL = deviceTTL != Long.MAX_VALUE;
+      boolean isDeleted =
+          !deviceTimeIndex.isDeviceAlive(device, deviceTTL)
+              || isDeviceDeletedByMods(
+                  modifications,
+                  device,
+                  deviceTimeIndex.getStartTime(device),
+                  deviceTimeIndex.getEndTime(device));
+
+      if (hasSetTTL) {
+        if (!isDeleted) {
+          // For devices with TTL set, all data must expire in order to meet 
the conditions for
+          // being selected.
+          return DirtyStatus.NOT_SATISFIED;
+        }
+        long outdatedTimeDiff = currentTime - 
deviceTimeIndex.getEndTime(device);
+        hasExpiredTooLong =
+            hasExpiredTooLong
+                || outdatedTimeDiff > Math.min(config.getMaxExpiredTime(), 3 * 
deviceTTL);
+      }
+
+      if (isDeleted) {
+        deletedDevices.add(device);
+      }
+    }
+
+    float deletedDeviceRate = (float) (deletedDevices.size()) / 
deviceTimeIndex.getDevices().size();
+    if (deletedDeviceRate == 1f) {
+      // the whole file is completely dirty
+      return DirtyStatus.ALL_DELETED;
+    }
+    hasExpiredTooLong = config.getMaxExpiredTime() != Long.MAX_VALUE && 
hasExpiredTooLong;
+    if (hasExpiredTooLong || deletedDeviceRate >= config.getExpiredDataRate()) 
{
+      // evaluate dirty data size in the tsfile
+      DirtyStatus partialDeleted = DirtyStatus.PARTIAL_DELETED;
+      partialDeleted.setDirtyDataSize((long) (deletedDeviceRate * 
resource.getTsFileSize()));
+      return partialDeleted;

Review Comment:
   Enum instances are static, so this use will affect other threads; please use 
a different implementation.
   ```
   enum Test{
       T1;
       public int num = 0;
     }
     public static void main(String[] args) {
       Test t1 = Test.T1;
       Test t11 = Test.T1;
       t1.num = 10;
       System.out.println(t11.num);
     }
   ```



##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionRecoverTest.java:
##########
@@ -0,0 +1,1137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.settle;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+public class SettleCompactionRecoverTest extends AbstractCompactionTest {
+  @Before
+  public void setUp()
+      throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
+    super.setUp();
+    IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(512);
+    IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(100);
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10);

Review Comment:
   Record the original settings and recover them after tests.
   Check all new tests you have added.



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties:
##########
@@ -450,6 +450,25 @@ data_replication_factor=1
 # Unit: ms
 # default_ttl_in_ms=-1
 
+# The threshold for the number of TTL stored in the system, the default is 
1000.
+# Negative value means the threshold is unlimited.
+# Datatype: int
+# ttl_count_threshold=1000
+
+# The interval of ttl check task in each database. Default is 2 hours.
+# Notice: It is not recommended to change it too small, as it will affect the 
read and write performance of the system.

Review Comment:
   To make the user more aware of this config's effect, add a short explanation 
about what a TTL check task does.



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java:
##########
@@ -119,6 +119,9 @@ public class CommonConfig {
    */
   private long[] tierTTLInMs = {Long.MAX_VALUE};
 
+  /** The threshold for the number of TTL stored in the system, the default is 
1000. */
+  private int TTLCountThreshold = 1000;

Review Comment:
   As in the properties file, it is better to use a more precise name.



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.schema.ttl;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@NotThreadSafe
+public class TTLCache {
+
+  private final CacheNode ttlCacheTree;
+  public static final long NULL_TTL = -1;
+
+  private int ttlCount;
+
+  public TTLCache() {
+    ttlCacheTree = new CacheNode(IoTDBConstant.PATH_ROOT);
+    long defaultTTL =
+        CommonDateTimeUtils.convertMilliTimeWithPrecision(
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    defaultTTL = defaultTTL <= 0 ? Long.MAX_VALUE : defaultTTL;
+    ttlCacheTree.addChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD, defaultTTL);
+    ttlCount = 1;
+  }
+
+  /**
+   * Put ttl into cache tree.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public void setTTL(String[] nodes, long ttl) {
+    if (nodes.length < 2 || ttl <= 0) {
+      return;
+    }
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        parent.addChild(nodes[i], NULL_TTL);
+        child = parent.getChild(nodes[i]);
+      }
+      parent = child;
+    }
+    if (parent.ttl == NULL_TTL) {
+      ttlCount++;
+    }
+    parent.ttl = ttl;
+  }
+
+  /**
+   * Unset ttl and remove all useless nodes. If the path to be removed is 
internal node, then just
+   * reset its ttl. Else, find the sub path and remove it.
+   *
+   * @param nodes path to be removed
+   */
+  public void unsetTTL(String[] nodes) {
+    if (nodes.length < 2) {
+      return;
+    } else if (nodes.length == 2) {
+      // if path equals to root.**, then unset it to configured ttl
+      if (nodes[0].equals(IoTDBConstant.PATH_ROOT)
+          && nodes[1].equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
+        ttlCacheTree.getChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD).ttl =
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs();

Review Comment:
   What about the time precision?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ISettleSelector;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
+import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl.DirtyStatus.PARTIAL_DELETED;
+
+public class SettleSelectorImpl implements ISettleSelector {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private final boolean heavySelect;
+  private final String storageGroupName;
+  private final String dataRegionId;
+  private final long timePartition;
+  private final TsFileManager tsFileManager;
+  private boolean isSeq;
+
+  public SettleSelectorImpl(
+      boolean heavySelect,
+      String storageGroupName,
+      String dataRegionId,
+      long timePartition,
+      TsFileManager tsFileManager) {
+    this.heavySelect = heavySelect;
+    this.storageGroupName = storageGroupName;
+    this.dataRegionId = dataRegionId;
+    this.timePartition = timePartition;
+    this.tsFileManager = tsFileManager;
+  }
+
+  static class AllDirtyResource {
+    List<TsFileResource> resources = new ArrayList<>();
+
+    public void add(TsFileResource resource) {
+      resources.add(resource);
+    }
+
+    public List<TsFileResource> getResources() {
+      return resources;
+    }
+  }
+
+  static class PartialDirtyResource {
+    List<TsFileResource> resources = new ArrayList<>();
+    long totalFileSize = 0;
+
+    public boolean add(TsFileResource resource, long dirtyDataSize) {
+      resources.add(resource);
+      totalFileSize += resource.getTsFileSize();
+      totalFileSize -= dirtyDataSize;
+      return checkHasReachedThreshold();
+    }
+
+    public List<TsFileResource> getResources() {
+      return resources;
+    }
+
+    public boolean checkHasReachedThreshold() {
+      return resources.size() >= config.getFileLimitPerInnerTask()
+          || totalFileSize >= config.getTargetCompactionFileSize();
+    }
+  }
+
+  @Override
+  public List<SettleCompactionTask> selectSettleTask(List<TsFileResource> 
tsFileResources) {
+    if (tsFileResources.isEmpty()) {
+      return Collections.emptyList();
+    }
+    this.isSeq = tsFileResources.get(0).isSeq();
+    return selectTasks(tsFileResources);
+  }
+
+  private List<SettleCompactionTask> selectTasks(List<TsFileResource> 
resources) {
+    AllDirtyResource allDirtyResource = new AllDirtyResource();
+    List<PartialDirtyResource> partialDirtyResourceList = new ArrayList<>();
+    PartialDirtyResource partialDirtyResource = new PartialDirtyResource();
+    try {
+      for (TsFileResource resource : resources) {
+        if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
+          continue;
+        }
+        boolean shouldStop = false;
+        DirtyStatus dirtyStatus;
+        if (!heavySelect) {
+          dirtyStatus = selectFileBaseOnModSize(resource);
+        } else {
+          dirtyStatus = selectFileBaseOnDirtyData(resource);
+        }
+
+        switch (dirtyStatus) {
+          case ALL_DELETED:
+            allDirtyResource.add(resource);
+            break;
+          case PARTIAL_DELETED:
+            shouldStop = partialDirtyResource.add(resource, 
dirtyStatus.getDirtyDataSize());
+            break;
+          case NOT_SATISFIED:
+            shouldStop = !partialDirtyResource.getResources().isEmpty();
+            break;
+          default:
+            // do nothing
+        }
+
+        if (shouldStop) {
+          partialDirtyResourceList.add(partialDirtyResource);
+          partialDirtyResource = new PartialDirtyResource();
+          if (!heavySelect) {
+            // Non-heavy selection is triggered more frequently. In order to 
avoid selecting too
+            // many files containing mods for compaction when the disk is 
insufficient, the number
+            // and size of files are limited here.
+            break;
+          }
+        }
+      }
+      partialDirtyResourceList.add(partialDirtyResource);
+      return creatTask(allDirtyResource, partialDirtyResourceList);
+    } catch (Exception e) {
+      LOGGER.error(
+          "{}-{} cannot select file for settle compaction", storageGroupName, 
dataRegionId, e);
+    }
+    return Collections.emptyList();
+  }
+
+  private DirtyStatus selectFileBaseOnModSize(TsFileResource resource) {
+    ModificationFile modFile = resource.getModFile();
+    if (modFile == null || !modFile.exists()) {
+      return DirtyStatus.NOT_SATISFIED;
+    }
+    return modFile.getSize() > 
config.getInnerCompactionTaskSelectionModsFileThreshold()
+            || (!heavySelect
+                && !CompactionUtils.isDiskHasSpace(
+                    config.getInnerCompactionTaskSelectionDiskRedundancy()))
+        ? PARTIAL_DELETED
+        : DirtyStatus.NOT_SATISFIED;
+  }
+
+  /**
+   * Only when all devices with ttl are deleted may they be selected. On the 
basic of the previous,
+   * only when the number of deleted devices exceeds the threshold or has 
expired for too long will
+   * they be selected.
+   *
+   * @return dirty status means the status of current resource.
+   */
+  private DirtyStatus selectFileBaseOnDirtyData(TsFileResource resource)
+      throws IOException, IllegalPathException {
+    ModificationFile modFile = resource.getModFile();
+    DeviceTimeIndex deviceTimeIndex =
+        resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE
+            ? resource.buildDeviceTimeIndex()
+            : (DeviceTimeIndex) resource.getTimeIndex();
+    Set<IDeviceID> deletedDevices = new HashSet<>();
+    boolean hasExpiredTooLong = false;
+    long currentTime = CommonDateTimeUtils.currentTime();
+
+    Collection<Modification> modifications = modFile.getModifications();
+    for (IDeviceID device : deviceTimeIndex.getDevices()) {
+      // check expired device by ttl
+      long deviceTTL = DataNodeTTLCache.getInstance().getTTL(((PlainDeviceID) 
device).toStringID());
+      boolean hasSetTTL = deviceTTL != Long.MAX_VALUE;
+      boolean isDeleted =
+          !deviceTimeIndex.isDeviceAlive(device, deviceTTL)
+              || isDeviceDeletedByMods(
+                  modifications,
+                  device,
+                  deviceTimeIndex.getStartTime(device),
+                  deviceTimeIndex.getEndTime(device));
+
+      if (hasSetTTL) {
+        if (!isDeleted) {
+          // For devices with TTL set, all data must expire in order to meet 
the conditions for
+          // being selected.
+          return DirtyStatus.NOT_SATISFIED;
+        }
+        long outdatedTimeDiff = currentTime - 
deviceTimeIndex.getEndTime(device);
+        hasExpiredTooLong =
+            hasExpiredTooLong
+                || outdatedTimeDiff > Math.min(config.getMaxExpiredTime(), 3 * 
deviceTTL);
+      }
+
+      if (isDeleted) {
+        deletedDevices.add(device);
+      }
+    }
+
+    float deletedDeviceRate = (float) (deletedDevices.size()) / 
deviceTimeIndex.getDevices().size();
+    if (deletedDeviceRate == 1f) {

Review Comment:
   Absolutely not, should just compare the two sizes in this branch.
   ```
   public static void main(String[] args) {
       System.out.println((float) (Integer.MAX_VALUE - 2) / (Integer.MAX_VALUE) 
== 1f);
     }
   ```



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.schema.ttl;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@NotThreadSafe
+public class TTLCache {
+
+  private final CacheNode ttlCacheTree;
+  public static final long NULL_TTL = -1;
+
+  private int ttlCount;
+
+  public TTLCache() {
+    ttlCacheTree = new CacheNode(IoTDBConstant.PATH_ROOT);
+    long defaultTTL =
+        CommonDateTimeUtils.convertMilliTimeWithPrecision(
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    defaultTTL = defaultTTL <= 0 ? Long.MAX_VALUE : defaultTTL;
+    ttlCacheTree.addChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD, defaultTTL);
+    ttlCount = 1;
+  }
+
+  /**
+   * Put ttl into cache tree.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public void setTTL(String[] nodes, long ttl) {
+    if (nodes.length < 2 || ttl <= 0) {
+      return;
+    }
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        parent.addChild(nodes[i], NULL_TTL);
+        child = parent.getChild(nodes[i]);
+      }
+      parent = child;
+    }
+    if (parent.ttl == NULL_TTL) {
+      ttlCount++;
+    }
+    parent.ttl = ttl;
+  }
+
+  /**
+   * Unset ttl and remove all useless nodes. If the path to be removed is 
internal node, then just
+   * reset its ttl. Else, find the sub path and remove it.
+   *
+   * @param nodes path to be removed
+   */
+  public void unsetTTL(String[] nodes) {
+    if (nodes.length < 2) {
+      return;
+    } else if (nodes.length == 2) {
+      // if path equals to root.**, then unset it to configured ttl
+      if (nodes[0].equals(IoTDBConstant.PATH_ROOT)
+          && nodes[1].equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
+        ttlCacheTree.getChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD).ttl =
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs();
+        return;
+      }
+    }
+    CacheNode parent = ttlCacheTree;

Review Comment:
   I am not sure how many people would use "parent", but I think "current" is 
easier to follow.
   The name keeps me wondering, "Whose parent is this?"



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.schema.ttl;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@NotThreadSafe
+public class TTLCache {
+
+  private final CacheNode ttlCacheTree;
+  public static final long NULL_TTL = -1;
+
+  private int ttlCount;
+
+  public TTLCache() {
+    ttlCacheTree = new CacheNode(IoTDBConstant.PATH_ROOT);
+    long defaultTTL =
+        CommonDateTimeUtils.convertMilliTimeWithPrecision(
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    defaultTTL = defaultTTL <= 0 ? Long.MAX_VALUE : defaultTTL;
+    ttlCacheTree.addChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD, defaultTTL);
+    ttlCount = 1;
+  }
+
+  /**
+   * Put ttl into cache tree.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public void setTTL(String[] nodes, long ttl) {
+    if (nodes.length < 2 || ttl <= 0) {
+      return;
+    }
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        parent.addChild(nodes[i], NULL_TTL);
+        child = parent.getChild(nodes[i]);
+      }
+      parent = child;
+    }
+    if (parent.ttl == NULL_TTL) {
+      ttlCount++;
+    }
+    parent.ttl = ttl;
+  }
+
+  /**
+   * Unset ttl and remove all useless nodes. If the path to be removed is 
internal node, then just
+   * reset its ttl. Else, find the sub path and remove it.
+   *
+   * @param nodes path to be removed
+   */
+  public void unsetTTL(String[] nodes) {
+    if (nodes.length < 2) {
+      return;
+    } else if (nodes.length == 2) {
+      // if path equals to root.**, then unset it to configured ttl
+      if (nodes[0].equals(IoTDBConstant.PATH_ROOT)
+          && nodes[1].equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
+        ttlCacheTree.getChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD).ttl =
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs();
+        return;
+      }
+    }
+    CacheNode parent = ttlCacheTree;
+    int index = 0;
+    boolean usefulFlag;
+    CacheNode parentOfSubPathToBeRemoved = null;
+    for (int i = 1; i < nodes.length; i++) {
+      usefulFlag = !parent.getChildren().isEmpty() || parent.ttl != NULL_TTL;
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        // there is no matching path on ttl cache tree
+        return;
+      }
+      if (usefulFlag) {
+        parentOfSubPathToBeRemoved = parent;
+        index = i;
+      }
+      parent = child;
+    }
+    // currently, parent is the leaf node of the path to be removed
+    if (parent.ttl != NULL_TTL) {
+      ttlCount--;
+    }
+
+    if (!parent.getChildren().isEmpty()) {
+      // node to be removed is internal node, then just reset its ttl
+      parent.ttl = NULL_TTL;
+      return;
+    }
+
+    // node to be removed is leaf node, then remove corresponding node of this 
path from cache tree
+    if (parentOfSubPathToBeRemoved != null) {
+      parentOfSubPathToBeRemoved.removeChild(nodes[index]);
+    }
+  }
+
+  /**
+   * Get ttl from cache tree. Return the TTL of the node closest to the path 
leaf node that has a
+   * TTL which is not NULL_TTL.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public long getTTL(String[] nodes) {
+    long ttl = ttlCacheTree.ttl;
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = 
parent.getChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD);
+      ttl = child != null ? child.ttl : ttl;
+      parent = parent.getChild(nodes[i]);
+      if (parent == null) {
+        break;
+      }
+    }
+    ttl = parent != null && parent.ttl != NULL_TTL ? parent.ttl : ttl;
+    return ttl;
+  }
+
+  public Map<String, Long> getAllTTLUnderOneNode(String[] nodes) {
+    Map<String, Long> pathTTLMap = new HashMap<>();
+    CacheNode node = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      node = node.getChild(nodes[i]);
+      if (node == null) {
+        return pathTTLMap;
+      }
+    }
+
+    // get all ttl under current node
+    dfsCacheTree(pathTTLMap, new StringBuilder(new 
PartialPath(nodes).getFullPath()), node);
+    return pathTTLMap;
+  }
+
+  /**
+   * Return the ttl of path. If the path does not exist, it means that the TTL 
is not set, and
+   * return NULL_TTL.
+   */
+  public long getNodeTTL(String[] nodes) {
+    CacheNode node = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      node = node.getChild(nodes[i]);
+      if (node == null) {
+        return NULL_TTL;
+      }
+    }
+    return node.ttl;
+  }
+
+  public Map<String, Long> getAllPathTTL() {
+    Map<String, Long> result = new LinkedHashMap<>();
+    dfsCacheTree(result, new StringBuilder(IoTDBConstant.PATH_ROOT), 
ttlCacheTree);
+    return result;
+  }
+
+  private void dfsCacheTree(Map<String, Long> pathTTLMap, StringBuilder path, 
CacheNode node) {
+    if (node.ttl != NULL_TTL) {
+      pathTTLMap.put(path.toString(), node.ttl);
+    }
+    int idx = path.length();
+    for (Map.Entry<String, CacheNode> entry : node.getChildren().entrySet()) {
+      dfsCacheTree(
+          pathTTLMap,
+          
path.append(IoTDBConstant.PATH_SEPARATOR).append(entry.getValue().name),
+          entry.getValue());
+      path.delete(idx, path.length());
+    }
+  }
+
+  public int getTtlCount() {
+    return ttlCount;
+  }
+
+  public void serialize(BufferedOutputStream outputStream) throws IOException {
+    Map<String, Long> allPathTTLMap = getAllPathTTL();
+    ReadWriteIOUtils.write(allPathTTLMap.size(), outputStream);
+    for (Map.Entry<String, Long> entry : allPathTTLMap.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), outputStream);
+      ReadWriteIOUtils.write(entry.getValue(), outputStream);
+    }
+    outputStream.flush();
+  }
+
+  public void deserialize(BufferedInputStream bufferedInputStream) throws 
IOException {
+    int size = ReadWriteIOUtils.readInt(bufferedInputStream);
+    while (size > 0) {
+      String path = ReadWriteIOUtils.readString(bufferedInputStream);
+      long ttl = ReadWriteIOUtils.readLong(bufferedInputStream);
+      setTTL(Objects.requireNonNull(path).split("\\" + 
IoTDBConstant.PATH_SEPARATOR), ttl);
+      size--;
+    }
+  }

Review Comment:
   Why does the parameter have to be `BufferedOutputStream` instead of the 
normal `OutputStream`?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.ttl.TTLCache;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
+import org.apache.iotdb.confignode.consensus.response.ttl.ShowTTLResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class TTLInfo implements SnapshotProcessor {
+  private static final String SNAPSHOT_FILENAME = "ttl_info.bin";
+  private static final Logger LOGGER = LoggerFactory.getLogger(TTLInfo.class);
+
+  private final TTLCache ttlCache;
+
+  private final ReadWriteLock lock;
+
+  public TTLInfo() {
+    ttlCache = new TTLCache();
+    lock = new ReentrantReadWriteLock();
+  }
+
+  public TSStatus setTTL(SetTTLPlan plan) {
+    lock.writeLock().lock();
+    try {
+      ttlCache.setTTL(plan.getPathPattern(), plan.getTTL());
+      if (plan.isDataBase()) {
+        // set ttl to path.**
+        ttlCache.setTTL(
+            new PartialPath(plan.getPathPattern())
+                .concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)
+                .getNodes(),
+            plan.getTTL());
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  public TSStatus unsetTTL(SetTTLPlan plan) {
+    lock.writeLock().lock();
+    try {
+      ttlCache.unsetTTL(plan.getPathPattern());
+      if (plan.isDataBase()) {
+        // unset ttl to path.**
+        ttlCache.unsetTTL(
+            new PartialPath(plan.getPathPattern())
+                .concatNode(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)
+                .getNodes());
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  public ShowTTLResp showAllTTL() {
+    lock.readLock().lock();
+    ShowTTLResp resp = new ShowTTLResp();
+    try {
+      Map<String, Long> pathTTLMap = ttlCache.getAllPathTTL();
+      resp.setPathTTLMap(pathTTLMap);
+      resp.setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    } finally {
+      lock.readLock().unlock();
+    }
+    return resp;
+  }

Review Comment:
   I think only line 97 should stay in the lock block.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java:
##########
@@ -483,6 +476,13 @@ private void releaseAllLocks() {
 
   @Override
   public long getEstimatedMemoryCost() {
+    if (innerSpaceEstimator == null) {
+      if (this.performer instanceof ReadChunkCompactionPerformer) {
+        innerSpaceEstimator = new ReadChunkInnerCompactionEstimator();
+      } else if (this.performer instanceof FastCompactionPerformer) {
+        innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
+      }

Review Comment:
   Seeing this, I wonder if the CompactionPerformer interface can provide a 
SpaceEstimator and then you will not have to do the type check.
   To put it simply, `innerSpaceEstimator = performer.getSpaceEstimator()` may 
be enough.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.schema.ttl.TTLCache;
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DataNodeTTLCache {
+  private final TTLCache ttlCache;
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private DataNodeTTLCache() {
+    ttlCache = new TTLCache();
+  }
+
+  public static DataNodeTTLCache getInstance() {
+    return DataNodeTTLCacheHolder.INSTANCE;
+  }
+
+  private static class DataNodeTTLCacheHolder {
+    private static final DataNodeTTLCache INSTANCE = new DataNodeTTLCache();
+  }
+
+  public void setTTL(String path, long ttl) {
+    lock.writeLock().lock();
+    try {
+      ttlCache.setTTL(path.split("\\" + IoTDBConstant.PATH_SEPARATOR), ttl);

Review Comment:
   There is a constant for this, PATH_SEPARATER_NO_REGEX, you may use that.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ISettleSelector;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
+import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl.DirtyStatus.PARTIAL_DELETED;
+
+public class SettleSelectorImpl implements ISettleSelector {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private final boolean heavySelect;
+  private final String storageGroupName;
+  private final String dataRegionId;
+  private final long timePartition;
+  private final TsFileManager tsFileManager;
+  private boolean isSeq;
+
+  public SettleSelectorImpl(
+      boolean heavySelect,
+      String storageGroupName,
+      String dataRegionId,
+      long timePartition,
+      TsFileManager tsFileManager) {
+    this.heavySelect = heavySelect;
+    this.storageGroupName = storageGroupName;
+    this.dataRegionId = dataRegionId;
+    this.timePartition = timePartition;
+    this.tsFileManager = tsFileManager;
+  }
+
+  static class AllDirtyResource {
+    List<TsFileResource> resources = new ArrayList<>();
+
+    public void add(TsFileResource resource) {
+      resources.add(resource);
+    }
+
+    public List<TsFileResource> getResources() {
+      return resources;
+    }
+  }
+
+  static class PartialDirtyResource {
+    List<TsFileResource> resources = new ArrayList<>();
+    long totalFileSize = 0;

Review Comment:
   Also, `FullyDirtyResource` and `PartiallyDirtyResource` may be less 
confusing.
   By the way, `class PartialDirtyResource` may have something that 
List<TsFileResource> cannot do (actually the streaming API does), but `class 
AllDirtyResource` does not seem a necessary abstraction since  
List<TsFileResource> does all it can.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ISettleSelector;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
+import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl.DirtyStatus.PARTIAL_DELETED;
+
+public class SettleSelectorImpl implements ISettleSelector {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private final boolean heavySelect;
+  private final String storageGroupName;
+  private final String dataRegionId;
+  private final long timePartition;
+  private final TsFileManager tsFileManager;
+  private boolean isSeq;
+
+  public SettleSelectorImpl(
+      boolean heavySelect,

Review Comment:
   You'd better add some comments for arguments that are not very 
straightforward.



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties:
##########
@@ -450,6 +450,25 @@ data_replication_factor=1
 # Unit: ms
 # default_ttl_in_ms=-1
 
+# The threshold for the number of TTL stored in the system, the default is 
1000.
+# Negative value means the threshold is unlimited.
+# Datatype: int
+# ttl_count_threshold=1000
+
+# The interval of ttl check task in each database. Default is 2 hours.
+# Notice: It is not recommended to change it too small, as it will affect the 
read and write performance of the system.
+# Unit: ms
+# ttl_check_interval=7200000
+
+# The max expired time of device set with ttl. If the expired time exceeds 
this value, then the expired data will be cleaned by compaction. Default is 1 
month.

Review Comment:
   "device set" and "the expired time" can be confusing.
   Possible rephrase: 
   The expiring time of devices that have a ttl. If the data elapsed time 
(current timestamp minus the minimum data timestamp of the device) of such 
devices exceeds this value, then the data will be cleaned by 
compaction—default: 1 month.



##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java:
##########
@@ -277,6 +278,7 @@ public void 
testProgressIndexMinimumProgressIndexTopologicalSort() {
     }
   }
 
+  @Ignore

Review Comment:
   Please add a comment to explain this.



##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerWithFastPerformerTest.java:
##########
@@ -154,7 +154,7 @@ public void test1() throws IOException, MetadataException, 
InterruptedException
         .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
     String sgName = COMPACTION_TEST_SG + "test1";
     try {
-      CompactionTaskManager.getInstance().restart();
+      // CompactionTaskManager.getInstance().restart();

Review Comment:
   Remove or revert.



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties:
##########
@@ -450,6 +450,25 @@ data_replication_factor=1
 # Unit: ms
 # default_ttl_in_ms=-1
 
+# The threshold for the number of TTL stored in the system, the default is 
1000.
+# Negative value means the threshold is unlimited.
+# Datatype: int
+# ttl_count_threshold=1000
+
+# The interval of ttl check task in each database. Default is 2 hours.
+# Notice: It is not recommended to change it too small, as it will affect the 
read and write performance of the system.
+# Unit: ms
+# ttl_check_interval=7200000
+
+# The max expired time of device set with ttl. If the expired time exceeds 
this value, then the expired data will be cleaned by compaction. Default is 1 
month.
+# Notice: It is not recommended to change it too small, as it will affect the 
read and write performance of the system.
+# Unit: ms
+# max_expired_time=2592000000
+
+# The expired device rate. If the number of expired devices in one tsfile 
exceeds this value, then expired data of this tsfile will be cleaned by 
compaction.

Review Comment:
   Notice the difference between "rate" and "ratio".
   "number" -> "ratio", keep the terms consistent.



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.schema.ttl;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@NotThreadSafe
+public class TTLCache {
+
+  private final CacheNode ttlCacheTree;
+  public static final long NULL_TTL = -1;
+
+  private int ttlCount;
+
+  public TTLCache() {
+    ttlCacheTree = new CacheNode(IoTDBConstant.PATH_ROOT);
+    long defaultTTL =
+        CommonDateTimeUtils.convertMilliTimeWithPrecision(
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    defaultTTL = defaultTTL <= 0 ? Long.MAX_VALUE : defaultTTL;
+    ttlCacheTree.addChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD, defaultTTL);
+    ttlCount = 1;
+  }
+
+  /**
+   * Put ttl into cache tree.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public void setTTL(String[] nodes, long ttl) {
+    if (nodes.length < 2 || ttl <= 0) {
+      return;
+    }
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        parent.addChild(nodes[i], NULL_TTL);
+        child = parent.getChild(nodes[i]);

Review Comment:
   Maybe `addChild` can return the newly added node so that you will not need 
to get it again.
   Or you may even modify `getChild` to resemble `computeIfAbsent` in `Map`.



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties:
##########
@@ -450,6 +450,25 @@ data_replication_factor=1
 # Unit: ms
 # default_ttl_in_ms=-1
 
+# The threshold for the number of TTL stored in the system, the default is 
1000.
+# Negative value means the threshold is unlimited.
+# Datatype: int
+# ttl_count_threshold=1000
+
+# The interval of ttl check task in each database. Default is 2 hours.
+# Notice: It is not recommended to change it too small, as it will affect the 
read and write performance of the system.
+# Unit: ms
+# ttl_check_interval=7200000
+
+# The max expired time of device set with ttl. If the expired time exceeds 
this value, then the expired data will be cleaned by compaction. Default is 1 
month.
+# Notice: It is not recommended to change it too small, as it will affect the 
read and write performance of the system.
+# Unit: ms
+# max_expired_time=2592000000
+
+# The expired device rate. If the number of expired devices in one tsfile 
exceeds this value, then expired data of this tsfile will be cleaned by 
compaction.
+# Unit: float

Review Comment:
   There is no need for this; ratio does not have a unit, and float is not a 
unit.



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/TTLException.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.exception;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+public class TTLException extends Exception {
+
+  public TTLException(String path) {
+    super(
+        String.format(
+            "Illegal pattern path: %s, pattern path should end with **, 
otherwise, it should be a specific database or device path without *",
+            path));
+  }
+
+  public TTLException() {
+    super(
+        String.format(
+            "The number of TTL stored in the system has reached threshold %d, 
please increase the ttl_count parameter.",
+            
CommonDescriptor.getInstance().getConfig().getTTLCountThreshold()));
+  }
+}

Review Comment:
   This can be confusing. It is better to just use two exception classes.



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.schema.ttl;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@NotThreadSafe
+public class TTLCache {
+
+  private final CacheNode ttlCacheTree;
+  public static final long NULL_TTL = -1;
+
+  private int ttlCount;
+
+  public TTLCache() {
+    ttlCacheTree = new CacheNode(IoTDBConstant.PATH_ROOT);
+    long defaultTTL =
+        CommonDateTimeUtils.convertMilliTimeWithPrecision(
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    defaultTTL = defaultTTL <= 0 ? Long.MAX_VALUE : defaultTTL;
+    ttlCacheTree.addChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD, defaultTTL);
+    ttlCount = 1;
+  }
+
+  /**
+   * Put ttl into cache tree.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public void setTTL(String[] nodes, long ttl) {
+    if (nodes.length < 2 || ttl <= 0) {
+      return;
+    }
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        parent.addChild(nodes[i], NULL_TTL);
+        child = parent.getChild(nodes[i]);
+      }
+      parent = child;
+    }
+    if (parent.ttl == NULL_TTL) {
+      ttlCount++;
+    }
+    parent.ttl = ttl;
+  }
+
+  /**
+   * Unset ttl and remove all useless nodes. If the path to be removed is 
internal node, then just
+   * reset its ttl. Else, find the sub path and remove it.
+   *
+   * @param nodes path to be removed
+   */
+  public void unsetTTL(String[] nodes) {
+    if (nodes.length < 2) {
+      return;
+    } else if (nodes.length == 2) {
+      // if path equals to root.**, then unset it to configured ttl
+      if (nodes[0].equals(IoTDBConstant.PATH_ROOT)
+          && nodes[1].equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
+        ttlCacheTree.getChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD).ttl =
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs();
+        return;
+      }
+    }
+    CacheNode parent = ttlCacheTree;
+    int index = 0;
+    boolean usefulFlag;

Review Comment:
   May something like "hasNonDefaultTTL" or "hasCustomizedTTL"?



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.schema.ttl;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@NotThreadSafe
+public class TTLCache {
+
+  private final CacheNode ttlCacheTree;
+  public static final long NULL_TTL = -1;
+
+  private int ttlCount;
+
+  public TTLCache() {
+    ttlCacheTree = new CacheNode(IoTDBConstant.PATH_ROOT);
+    long defaultTTL =
+        CommonDateTimeUtils.convertMilliTimeWithPrecision(
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    defaultTTL = defaultTTL <= 0 ? Long.MAX_VALUE : defaultTTL;
+    ttlCacheTree.addChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD, defaultTTL);
+    ttlCount = 1;
+  }
+
+  /**
+   * Put ttl into cache tree.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public void setTTL(String[] nodes, long ttl) {
+    if (nodes.length < 2 || ttl <= 0) {
+      return;
+    }
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        parent.addChild(nodes[i], NULL_TTL);
+        child = parent.getChild(nodes[i]);
+      }
+      parent = child;
+    }
+    if (parent.ttl == NULL_TTL) {
+      ttlCount++;
+    }
+    parent.ttl = ttl;
+  }
+
+  /**
+   * Unset ttl and remove all useless nodes. If the path to be removed is 
internal node, then just
+   * reset its ttl. Else, find the sub path and remove it.

Review Comment:
   Useless or useful is very vague. Please use a more precise term or add some 
explanation.



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.schema.ttl;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@NotThreadSafe
+public class TTLCache {
+
+  private final CacheNode ttlCacheTree;
+  public static final long NULL_TTL = -1;
+
+  private int ttlCount;
+
+  public TTLCache() {
+    ttlCacheTree = new CacheNode(IoTDBConstant.PATH_ROOT);
+    long defaultTTL =
+        CommonDateTimeUtils.convertMilliTimeWithPrecision(
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    defaultTTL = defaultTTL <= 0 ? Long.MAX_VALUE : defaultTTL;
+    ttlCacheTree.addChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD, defaultTTL);
+    ttlCount = 1;
+  }
+
+  /**
+   * Put ttl into cache tree.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public void setTTL(String[] nodes, long ttl) {
+    if (nodes.length < 2 || ttl <= 0) {
+      return;
+    }
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        parent.addChild(nodes[i], NULL_TTL);
+        child = parent.getChild(nodes[i]);
+      }
+      parent = child;
+    }
+    if (parent.ttl == NULL_TTL) {
+      ttlCount++;
+    }
+    parent.ttl = ttl;
+  }
+
+  /**
+   * Unset ttl and remove all useless nodes. If the path to be removed is 
internal node, then just
+   * reset its ttl. Else, find the sub path and remove it.
+   *
+   * @param nodes path to be removed
+   */
+  public void unsetTTL(String[] nodes) {
+    if (nodes.length < 2) {
+      return;
+    } else if (nodes.length == 2) {
+      // if path equals to root.**, then unset it to configured ttl
+      if (nodes[0].equals(IoTDBConstant.PATH_ROOT)
+          && nodes[1].equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
+        ttlCacheTree.getChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD).ttl =
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs();
+        return;
+      }
+    }
+    CacheNode parent = ttlCacheTree;
+    int index = 0;
+    boolean usefulFlag;
+    CacheNode parentOfSubPathToBeRemoved = null;
+    for (int i = 1; i < nodes.length; i++) {
+      usefulFlag = !parent.getChildren().isEmpty() || parent.ttl != NULL_TTL;
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        // there is no matching path on ttl cache tree
+        return;
+      }
+      if (usefulFlag) {
+        parentOfSubPathToBeRemoved = parent;
+        index = i;
+      }
+      parent = child;
+    }
+    // currently, parent is the leaf node of the path to be removed
+    if (parent.ttl != NULL_TTL) {
+      ttlCount--;
+    }
+
+    if (!parent.getChildren().isEmpty()) {
+      // node to be removed is internal node, then just reset its ttl
+      parent.ttl = NULL_TTL;
+      return;
+    }
+
+    // node to be removed is leaf node, then remove corresponding node of this 
path from cache tree
+    if (parentOfSubPathToBeRemoved != null) {
+      parentOfSubPathToBeRemoved.removeChild(nodes[index]);
+    }
+  }
+
+  /**
+   * Get ttl from cache tree. Return the TTL of the node closest to the path 
leaf node that has a
+   * TTL which is not NULL_TTL.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public long getTTL(String[] nodes) {
+    long ttl = ttlCacheTree.ttl;
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = 
parent.getChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD);
+      ttl = child != null ? child.ttl : ttl;
+      parent = parent.getChild(nodes[i]);
+      if (parent == null) {
+        break;
+      }
+    }
+    ttl = parent != null && parent.ttl != NULL_TTL ? parent.ttl : ttl;
+    return ttl;
+  }
+
+  public Map<String, Long> getAllTTLUnderOneNode(String[] nodes) {
+    Map<String, Long> pathTTLMap = new HashMap<>();
+    CacheNode node = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      node = node.getChild(nodes[i]);
+      if (node == null) {
+        return pathTTLMap;
+      }
+    }
+
+    // get all ttl under current node
+    dfsCacheTree(pathTTLMap, new StringBuilder(new 
PartialPath(nodes).getFullPath()), node);

Review Comment:
   Why creating a PartialPath instead of joining the nodes directly?



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/ttl/TTLCache.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.schema.ttl;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@NotThreadSafe
+public class TTLCache {
+
+  private final CacheNode ttlCacheTree;
+  public static final long NULL_TTL = -1;
+
+  private int ttlCount;
+
+  public TTLCache() {
+    ttlCacheTree = new CacheNode(IoTDBConstant.PATH_ROOT);
+    long defaultTTL =
+        CommonDateTimeUtils.convertMilliTimeWithPrecision(
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    defaultTTL = defaultTTL <= 0 ? Long.MAX_VALUE : defaultTTL;
+    ttlCacheTree.addChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD, defaultTTL);
+    ttlCount = 1;
+  }
+
+  /**
+   * Put ttl into cache tree.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public void setTTL(String[] nodes, long ttl) {
+    if (nodes.length < 2 || ttl <= 0) {
+      return;
+    }
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        parent.addChild(nodes[i], NULL_TTL);
+        child = parent.getChild(nodes[i]);
+      }
+      parent = child;
+    }
+    if (parent.ttl == NULL_TTL) {
+      ttlCount++;
+    }
+    parent.ttl = ttl;
+  }
+
+  /**
+   * Unset ttl and remove all useless nodes. If the path to be removed is 
internal node, then just
+   * reset its ttl. Else, find the sub path and remove it.
+   *
+   * @param nodes path to be removed
+   */
+  public void unsetTTL(String[] nodes) {
+    if (nodes.length < 2) {
+      return;
+    } else if (nodes.length == 2) {
+      // if path equals to root.**, then unset it to configured ttl
+      if (nodes[0].equals(IoTDBConstant.PATH_ROOT)
+          && nodes[1].equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
+        ttlCacheTree.getChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD).ttl =
+            CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs();
+        return;
+      }
+    }
+    CacheNode parent = ttlCacheTree;
+    int index = 0;
+    boolean usefulFlag;
+    CacheNode parentOfSubPathToBeRemoved = null;
+    for (int i = 1; i < nodes.length; i++) {
+      usefulFlag = !parent.getChildren().isEmpty() || parent.ttl != NULL_TTL;
+      CacheNode child = parent.getChild(nodes[i]);
+      if (child == null) {
+        // there is no matching path on ttl cache tree
+        return;
+      }
+      if (usefulFlag) {
+        parentOfSubPathToBeRemoved = parent;
+        index = i;
+      }
+      parent = child;
+    }
+    // currently, parent is the leaf node of the path to be removed
+    if (parent.ttl != NULL_TTL) {
+      ttlCount--;
+    }
+
+    if (!parent.getChildren().isEmpty()) {
+      // node to be removed is internal node, then just reset its ttl
+      parent.ttl = NULL_TTL;
+      return;
+    }
+
+    // node to be removed is leaf node, then remove corresponding node of this 
path from cache tree
+    if (parentOfSubPathToBeRemoved != null) {
+      parentOfSubPathToBeRemoved.removeChild(nodes[index]);
+    }
+  }
+
+  /**
+   * Get ttl from cache tree. Return the TTL of the node closest to the path 
leaf node that has a
+   * TTL which is not NULL_TTL.
+   *
+   * @param nodes should be prefix path or specific device path without 
wildcard
+   */
+  public long getTTL(String[] nodes) {
+    long ttl = ttlCacheTree.ttl;
+    CacheNode parent = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      CacheNode child = 
parent.getChild(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD);
+      ttl = child != null ? child.ttl : ttl;
+      parent = parent.getChild(nodes[i]);
+      if (parent == null) {
+        break;
+      }
+    }
+    ttl = parent != null && parent.ttl != NULL_TTL ? parent.ttl : ttl;
+    return ttl;
+  }
+
+  public Map<String, Long> getAllTTLUnderOneNode(String[] nodes) {
+    Map<String, Long> pathTTLMap = new HashMap<>();
+    CacheNode node = ttlCacheTree;
+    for (int i = 1; i < nodes.length; i++) {
+      node = node.getChild(nodes[i]);
+      if (node == null) {
+        return pathTTLMap;
+      }
+    }
+
+    // get all ttl under current node
+    dfsCacheTree(pathTTLMap, new StringBuilder(new 
PartialPath(nodes).getFullPath()), node);
+    return pathTTLMap;
+  }
+
+  /**
+   * Return the ttl of path. If the path does not exist, it means that the TTL 
is not set, and
+   * return NULL_TTL.
+   */
+  public long getNodeTTL(String[] nodes) {

Review Comment:
   I suggest changing `getTTL` and `getNodeTTL` to `getCloestTTL` and 
`getLastNodeTTL` to make them more distinguishable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to