This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 96ee5a8  [IOTDB-1372]Enhance management of TsFileResource (#4003)
96ee5a8 is described below

commit 96ee5a83fb05e5b9f72683125ec7e376c7eff258
Author: Yuting Yan <[email protected]>
AuthorDate: Sat Oct 9 01:15:17 2021 -0500

    [IOTDB-1372]Enhance management of TsFileResource (#4003)
---
 pom.xml                                            |   2 +-
 .../resources/conf/iotdb-engine.properties         |   5 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  13 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 +
 .../level/LevelCompactionTsFileManagement.java     |  15 +
 .../no/NoCompactionTsFileManagement.java           |   8 +
 .../engine/storagegroup/StorageGroupProcessor.java | 121 ++++---
 .../db/engine/storagegroup/TsFileResource.java     |  36 +-
 .../storagegroup/timeindex/DeviceTimeIndex.java    |  68 +++-
 .../storagegroup/timeindex/FileTimeIndex.java      |  22 ++
 .../engine/storagegroup/timeindex/ITimeIndex.java  |  24 ++
 .../iotdb/db/rescon/TsFileResourceManager.java     | 121 +++++++
 .../integration/IoTDBManageTsFileResourceIT.java   | 291 +++++++++++++++
 .../iotdb/db/rescon/ResourceManagerTest.java       | 401 +++++++++++++++++++++
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   5 +
 15 files changed, 1063 insertions(+), 75 deletions(-)

diff --git a/pom.xml b/pom.xml
index 7330d54..3b3fe13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1083,7 +1083,7 @@
             </activation>
             <properties>
                 <maven.compiler.release>8</maven.compiler.release>
-                <argLine>--illegal-access=permit 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.t [...]
+                <argLine>--illegal-access=permit 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.java [...]
             </properties>
         </profile>
         <!--
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index f0745fb..31b1bc8 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -359,6 +359,10 @@ timestamp_precision=ms
 # Datatype: double
 # flush_proportion=0.4
 
+# Ratio of read memory allocated for timeIndex, 0.2 by default
+# Datatype: double
+# time_index_memory_proportion=0.2
+
 # Ratio of write memory allocated for buffered arrays, 0.6 by default
 # Datatype: double
 # buffered_arrays_memory_proportion=0.6
@@ -405,7 +409,6 @@ timestamp_precision=ms
 # Datatype: int
 # upgrade_thread_num=1
 
-
 ####################
 ### Query Configurations
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7ab0170..d3b218e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -125,6 +125,9 @@ public class IoTDBConfig {
   /** Ratio of memory allocated for buffered arrays */
   private double bufferedArraysMemoryProportion = 0.6;
 
+  /** Memory allocated proportion for timeIndex */
+  private double timeIndexMemoryProportion = 0.2;
+
   /** Flush proportion for system */
   private double flushProportion = 0.4;
 
@@ -1382,6 +1385,14 @@ public class IoTDBConfig {
     this.bufferedArraysMemoryProportion = bufferedArraysMemoryProportion;
   }
 
+  public double getTimeIndexMemoryProportion() {
+    return timeIndexMemoryProportion;
+  }
+
+  public void setTimeIndexMemoryProportion(double timeIndexMemoryProportion) {
+    this.timeIndexMemoryProportion = timeIndexMemoryProportion;
+  }
+
   public double getFlushProportion() {
     return flushProportion;
   }
@@ -1422,7 +1433,7 @@ public class IoTDBConfig {
     this.allocateMemoryForSchema = allocateMemoryForSchema;
   }
 
-  long getAllocateMemoryForRead() {
+  public long getAllocateMemoryForRead() {
     return allocateMemoryForRead;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f6adf66..849625a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -175,6 +175,12 @@ public class IoTDBDescriptor {
                   "buffered_arrays_memory_proportion",
                   Double.toString(conf.getBufferedArraysMemoryProportion()))));
 
+      conf.setTimeIndexMemoryProportion(
+          Double.parseDouble(
+              properties.getProperty(
+                  "time_index_memory_proportion",
+                  Double.toString(conf.getTimeIndexMemoryProportion()))));
+
       conf.setFlushProportion(
           Double.parseDouble(
               properties.getProperty(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 03c14ba..f8fa374 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -89,6 +90,9 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
   private final List<TsFileResource> sequenceRecoverTsFileResources = new 
ArrayList<>();
   private final List<TsFileResource> unSequenceRecoverTsFileResources = new 
ArrayList<>();
 
+  /** manage TsFileResource degrade */
+  private TsFileResourceManager tsFileResourceManager = 
TsFileResourceManager.getInstance();
+
   public LevelCompactionTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
     clear();
@@ -149,6 +153,9 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
         }
       }
     }
+    for (TsFileResource tsFileResource : mergeTsFiles) {
+      tsFileResourceManager.removeTsFileResource(tsFileResource);
+    }
   }
 
   private void deleteLevelFile(TsFileResource seqFile) {
@@ -238,6 +245,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
     } finally {
       writeUnlock();
     }
+    tsFileResourceManager.removeTsFileResource(tsFileResource);
   }
 
   @Override
@@ -262,6 +270,9 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
     } finally {
       writeUnlock();
     }
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      tsFileResourceManager.removeTsFileResource(tsFileResource);
+    }
   }
 
   @Override
@@ -475,6 +486,8 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
           }
           // complete compaction and delete source file
           deleteAllSubLevelFiles(isSeq, timePartition);
+
+          
tsFileResourceManager.registerSealedTsFileResource(targetTsFileResource);
         } else {
           // get tsfile resource from list, as they have been recovered in 
StorageGroupProcessor
           TsFileResource targetResource = getRecoverTsFileResource(targetFile, 
isSeq);
@@ -519,6 +532,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
                 unSequenceRecoverTsFileResources.clear();
               }
               deleteLevelFilesInList(timePartition, sourceTsFileResources, 
level, isSeq);
+              
tsFileResourceManager.registerSealedTsFileResource(targetResource);
             } finally {
               writeUnlock();
             }
@@ -712,6 +726,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
                 unSequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
               }
               deleteLevelFilesInList(timePartition, toMergeTsFiles, i, 
sequence);
+              tsFileResourceManager.registerSealedTsFileResource(newResource);
               if (mergeResources.size() > i + 1) {
                 mergeResources.get(i + 1).add(newResource);
               }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 5c3d8b1..83581d1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.no;
 
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +44,9 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
   // includes sealed and unsealed unSequence TsFiles
   private final Map<Long, List<TsFileResource>> unSequenceFileListMap = new 
TreeMap<>();
 
+  /** manage TsFileResource degrade */
+  private TsFileResourceManager tsFileResourceManager = 
TsFileResourceManager.getInstance();
+
   public NoCompactionTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
   }
@@ -110,6 +114,7 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
     } finally {
       writeUnlock();
     }
+    tsFileResourceManager.removeTsFileResource(tsFileResource);
   }
 
   @Override
@@ -155,6 +160,9 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
     } finally {
       writeUnlock();
     }
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      tsFileResourceManager.removeTsFileResource(tsFileResource);
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 985fb8b..b5d1363 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -63,6 +63,7 @@ import 
org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
 import org.apache.iotdb.db.utils.MmapUtil;
@@ -227,6 +228,10 @@ public class StorageGroupProcessor {
 
   /** manage seqFileList and unSeqFileList */
   private TsFileManagement tsFileManagement;
+
+  /** manage tsFileResource degrade */
+  private TsFileResourceManager tsFileResourceManager = 
TsFileResourceManager.getInstance();
+
   /**
    * time partition id -> version controller which assigns a version for each 
MemTable and
    * deletion/update such that after they are persisted, the order of 
insertions, deletions and
@@ -761,77 +766,80 @@ public class StorageGroupProcessor {
           if (writer.hasCrashed()) {
             tsFileManagement.addRecover(tsFileResource, isSeq);
           } else {
-            tsFileResource.setClosed(true);
+            tsFileResource.close();
             tsFileManagement.add(tsFileResource, isSeq);
+            tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
           }
           continue;
         } else {
           writer =
               recoverPerformer.recover(true, this::getWalDirectByteBuffer, 
this::releaseWalBuffer);
         }
-      } catch (StorageGroupProcessorException e) {
-        logger.warn(
-            "Skip TsFile: {} because of error in recover: ", 
tsFileResource.getTsFilePath(), e);
-        continue;
-      }
 
-      if (i != tsFiles.size() - 1 || !writer.canWrite()) {
-        // not the last file or cannot write, just close it
-        tsFileResource.setClosed(true);
-      } else if (writer.canWrite()) {
-        // the last file is not closed, continue writing to in
-        TsFileProcessor tsFileProcessor;
-        if (isSeq) {
-          tsFileProcessor =
-              new TsFileProcessor(
-                  virtualStorageGroupId,
-                  storageGroupInfo,
-                  tsFileResource,
-                  this::closeUnsealedTsFileProcessorCallBack,
-                  this::updateLatestFlushTimeCallback,
-                  true,
-                  writer);
-          if (enableMemControl) {
-            TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
-            tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
-            this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
+        if (i != tsFiles.size() - 1 || !writer.canWrite()) {
+          // not the last file or cannot write, just close it
+          tsFileResource.close();
+          tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+        } else if (writer.canWrite()) {
+          // the last file is not closed, continue writing to in
+          TsFileProcessor tsFileProcessor;
+          if (isSeq) {
+            tsFileProcessor =
+                new TsFileProcessor(
+                    virtualStorageGroupId,
+                    storageGroupInfo,
+                    tsFileResource,
+                    this::closeUnsealedTsFileProcessorCallBack,
+                    this::updateLatestFlushTimeCallback,
+                    true,
+                    writer);
+            if (enableMemControl) {
+              TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
+              tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
+              this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
+            }
+            workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
+          } else {
+            tsFileProcessor =
+                new TsFileProcessor(
+                    virtualStorageGroupId,
+                    storageGroupInfo,
+                    tsFileResource,
+                    this::closeUnsealedTsFileProcessorCallBack,
+                    this::unsequenceFlushCallback,
+                    false,
+                    writer);
+            if (enableMemControl) {
+              TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
+              tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
+              this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
+            }
+            workUnsequenceTsFileProcessors.put(timePartitionId, 
tsFileProcessor);
           }
-          workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
-        } else {
-          tsFileProcessor =
-              new TsFileProcessor(
-                  virtualStorageGroupId,
-                  storageGroupInfo,
-                  tsFileResource,
-                  this::closeUnsealedTsFileProcessorCallBack,
-                  this::unsequenceFlushCallback,
-                  false,
-                  writer);
+          tsFileResource.setProcessor(tsFileProcessor);
+          tsFileResource.removeResourceFile();
+          tsFileProcessor.setTimeRangeId(timePartitionId);
+          writer.makeMetadataVisible();
           if (enableMemControl) {
-            TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
-            tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
-            this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-          }
-          workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
-        }
-        tsFileResource.setProcessor(tsFileProcessor);
-        tsFileResource.removeResourceFile();
-        tsFileProcessor.setTimeRangeId(timePartitionId);
-        writer.makeMetadataVisible();
-        if (enableMemControl) {
-          // get chunkMetadata size
-          long chunkMetadataSize = 0;
-          for (Map<String, List<ChunkMetadata>> metaMap : 
writer.getMetadatasForQuery().values()) {
-            for (List<ChunkMetadata> metadatas : metaMap.values()) {
-              for (ChunkMetadata chunkMetadata : metadatas) {
-                chunkMetadataSize += chunkMetadata.calculateRamSize();
+            // get chunkMetadata size
+            long chunkMetadataSize = 0;
+            for (Map<String, List<ChunkMetadata>> metaMap :
+                writer.getMetadatasForQuery().values()) {
+              for (List<ChunkMetadata> metadatas : metaMap.values()) {
+                for (ChunkMetadata chunkMetadata : metadatas) {
+                  chunkMetadataSize += chunkMetadata.calculateRamSize();
+                }
               }
             }
+            
tsFileProcessor.getTsFileProcessorInfo().addTSPMemCost(chunkMetadataSize);
           }
-          
tsFileProcessor.getTsFileProcessorInfo().addTSPMemCost(chunkMetadataSize);
         }
+        tsFileManagement.add(tsFileResource, isSeq);
+      } catch (StorageGroupProcessorException | IOException e) {
+        logger.warn(
+            "Skip TsFile: {} because of error in recover: ", 
tsFileResource.getTsFilePath(), e);
+        continue;
       }
-      tsFileManagement.add(tsFileResource, isSeq);
     }
   }
 
@@ -2175,6 +2183,7 @@ public class StorageGroupProcessor {
     closeQueryLock.writeLock().lock();
     try {
       tsFileProcessor.close();
+      
tsFileResourceManager.registerSealedTsFileResource(tsFileProcessor.getTsFileResource());
     } finally {
       closeQueryLock.writeLock().unlock();
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 46be2b9..5bc2070 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import 
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpgradeTsFileResourceCallBack;
 import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.FileTimeIndex;
 import org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex;
 import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
 import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
@@ -155,6 +156,9 @@ public class TsFileResource {
 
   private long version = 0;
 
+  /** memory cost for the TsFileResource when it's calculated for the first 
time */
+  private long ramSize;
+
   public TsFileResource() {}
 
   public TsFileResource(TsFileResource other) throws IOException {
@@ -774,7 +778,12 @@ public class TsFileResource {
 
   /** @return resource map size */
   public long calculateRamSize() {
-    return timeIndex.calculateRamSize();
+    ramSize = timeIndex.calculateRamSize();
+    return ramSize;
+  }
+
+  public long getRamSize() {
+    return ramSize;
   }
 
   public void delete() throws IOException {
@@ -848,8 +857,31 @@ public class TsFileResource {
     this.timeIndex = timeIndex;
   }
 
-  // change tsFile name
+  public byte getTimeIndexType() {
+    return timeIndexType;
+  }
 
+  public int compareIndexDegradePriority(TsFileResource tsFileResource) {
+    int cmp = timeIndex.compareDegradePriority(tsFileResource.timeIndex);
+    return cmp == 0 ? 
file.getAbsolutePath().compareTo(tsFileResource.file.getAbsolutePath()) : cmp;
+  }
+
+  /** the DeviceTimeIndex degrade to FileTimeIndex and release memory */
+  public long degradeTimeIndex() {
+    TimeIndexLevel timeIndexLevel = TimeIndexLevel.valueOf(timeIndexType);
+    // if current timeIndex is FileTimeIndex, no need to degrade
+    if (timeIndexLevel == TimeIndexLevel.FILE_TIME_INDEX) return 0;
+    // get the minimum startTime
+    long startTime = timeIndex.getMinStartTime();
+    // get the maximum endTime
+    long endTime = timeIndex.getMaxEndTime();
+    // replace the DeviceTimeIndex with FileTimeIndex
+    timeIndex = new FileTimeIndex(startTime, endTime);
+    timeIndexType = 0;
+    return ramSize - timeIndex.calculateRamSize();
+  }
+
+  // change tsFile name
   public static String getNewTsFileName(long time, long version, int mergeCnt, 
int unSeqMergeCnt) {
     return time
         + FILE_NAME_SEPARATOR
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index 013f111..5db2fb2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -27,6 +27,9 @@ import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -39,6 +42,8 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class DeviceTimeIndex implements ITimeIndex {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(DeviceTimeIndex.class);
+
   public static final int INIT_ARRAY_SIZE = 64;
 
   protected static final Map<String, String> cachedDevicePool =
@@ -53,6 +58,12 @@ public class DeviceTimeIndex implements ITimeIndex {
    */
   protected long[] endTimes;
 
+  /** min start time */
+  private long minStartTime = Long.MAX_VALUE;
+
+  /** max end time */
+  private long maxEndTime = Long.MIN_VALUE;
+
   /** device -> index of start times array and end times array */
   protected Map<String, Integer> deviceToIndex;
 
@@ -96,13 +107,15 @@ public class DeviceTimeIndex implements ITimeIndex {
   @Override
   public DeviceTimeIndex deserialize(InputStream inputStream) throws 
IOException {
     int deviceNum = ReadWriteIOUtils.readInt(inputStream);
-    Map<String, Integer> deviceMap = new ConcurrentHashMap<>();
-    long[] startTimesArray = new long[deviceNum];
-    long[] endTimesArray = new long[deviceNum];
+
+    startTimes = new long[deviceNum];
+    endTimes = new long[deviceNum];
 
     for (int i = 0; i < deviceNum; i++) {
-      startTimesArray[i] = ReadWriteIOUtils.readLong(inputStream);
-      endTimesArray[i] = ReadWriteIOUtils.readLong(inputStream);
+      startTimes[i] = ReadWriteIOUtils.readLong(inputStream);
+      endTimes[i] = ReadWriteIOUtils.readLong(inputStream);
+      minStartTime = Math.min(minStartTime, startTimes[i]);
+      maxEndTime = Math.max(maxEndTime, endTimes[i]);
     }
 
     for (int i = 0; i < deviceNum; i++) {
@@ -111,21 +124,22 @@ public class DeviceTimeIndex implements ITimeIndex {
       // use the deviceId from memory instead of the deviceId read from disk
       String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
       int index = ReadWriteIOUtils.readInt(inputStream);
-      deviceMap.put(cachedPath, index);
+      deviceToIndex.put(cachedPath, index);
     }
-    return new DeviceTimeIndex(deviceMap, startTimesArray, endTimesArray);
+    return this;
   }
 
   @Override
   public DeviceTimeIndex deserialize(ByteBuffer buffer) {
     int deviceNum = buffer.getInt();
-    Map<String, Integer> deviceMap = new ConcurrentHashMap<>(deviceNum);
-    long[] startTimesArray = new long[deviceNum];
-    long[] endTimesArray = new long[deviceNum];
+    startTimes = new long[deviceNum];
+    endTimes = new long[deviceNum];
 
     for (int i = 0; i < deviceNum; i++) {
-      startTimesArray[i] = buffer.getLong();
-      endTimesArray[i] = buffer.getLong();
+      startTimes[i] = buffer.getLong();
+      endTimes[i] = buffer.getLong();
+      minStartTime = Math.min(minStartTime, startTimes[i]);
+      maxEndTime = Math.max(maxEndTime, endTimes[i]);
     }
 
     for (int i = 0; i < deviceNum; i++) {
@@ -134,9 +148,9 @@ public class DeviceTimeIndex implements ITimeIndex {
       // use the deviceId from memory instead of the deviceId read from disk
       String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
       int index = buffer.getInt();
-      deviceMap.put(cachedPath, index);
+      deviceToIndex.put(cachedPath, index);
     }
-    return new DeviceTimeIndex(deviceMap, startTimesArray, endTimesArray);
+    return this;
   }
 
   @Override
@@ -263,6 +277,7 @@ public class DeviceTimeIndex implements ITimeIndex {
       int index = getDeviceIndex(deviceId);
       startTimes[index] = time;
     }
+    minStartTime = Math.min(minStartTime, time);
   }
 
   @Override
@@ -272,18 +287,21 @@ public class DeviceTimeIndex implements ITimeIndex {
       int index = getDeviceIndex(deviceId);
       endTimes[index] = time;
     }
+    maxEndTime = Math.max(maxEndTime, time);
   }
 
   @Override
   public void putStartTime(String deviceId, long time) {
     int index = getDeviceIndex(deviceId);
     startTimes[index] = time;
+    minStartTime = Math.min(minStartTime, time);
   }
 
   @Override
   public void putEndTime(String deviceId, long time) {
     int index = getDeviceIndex(deviceId);
     endTimes[index] = time;
+    maxEndTime = Math.max(maxEndTime, time);
   }
 
   @Override
@@ -306,4 +324,26 @@ public class DeviceTimeIndex implements ITimeIndex {
   public boolean checkDeviceIdExist(String deviceId) {
     return deviceToIndex.containsKey(deviceId);
   }
+
+  @Override
+  public long getMinStartTime() {
+    return minStartTime;
+  }
+
+  @Override
+  public long getMaxEndTime() {
+    return maxEndTime;
+  }
+
+  @Override
+  public int compareDegradePriority(ITimeIndex timeIndex) {
+    if (timeIndex instanceof DeviceTimeIndex) {
+      return Long.compare(getMinStartTime(), timeIndex.getMinStartTime());
+    } else if (timeIndex instanceof FileTimeIndex) {
+      return -1;
+    } else {
+      logger.error("Wrong timeIndex type {}", timeIndex.getClass().getName());
+      throw new RuntimeException("Wrong timeIndex type " + 
timeIndex.getClass().getName());
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
index d427f34..54040b8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -173,12 +173,34 @@ public class FileTimeIndex implements ITimeIndex {
   }
 
   @Override
+  public long getMinStartTime() {
+    return startTime;
+  }
+
+  @Override
   public long getEndTime(String deviceId) {
     return endTime;
   }
 
   @Override
+  public long getMaxEndTime() {
+    return endTime;
+  }
+
+  @Override
   public boolean checkDeviceIdExist(String deviceId) {
     return true;
   }
+
+  @Override
+  public int compareDegradePriority(ITimeIndex timeIndex) {
+    if (timeIndex instanceof DeviceTimeIndex) {
+      return 1;
+    } else if (timeIndex instanceof FileTimeIndex) {
+      return Long.compare(startTime, timeIndex.getMinStartTime());
+    } else {
+      logger.error("Wrong timeIndex type {}", timeIndex.getClass().getName());
+      throw new RuntimeException("Wrong timeIndex type " + 
timeIndex.getClass().getName());
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index d53dd48..37fac06 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -155,4 +155,28 @@ public interface ITimeIndex {
    * @return true if the deviceId may exist in TsFile, otherwise false.
    */
   boolean checkDeviceIdExist(String deviceId);
+
+  /**
+   * get min start time of device
+   *
+   * @return min start time
+   */
+  long getMinStartTime();
+
+  /**
+   * get max end time of device
+   *
+   * @return max end time
+   */
+  long getMaxEndTime();
+
+  /**
+   * compare the priority of two ITimeIndex
+   *
+   * @param timeIndex another timeIndex
+   * @return value is less than 0 if the priority of this timeIndex is higher 
than the argument,
+   *     value is equal to 0 if the priority of this timeIndex is equal to the 
argument, value is
+   *     larger than 0 if the priority of this timeIndex is less than the 
argument
+   */
+  int compareDegradePriority(ITimeIndex timeIndex);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
new file mode 100644
index 0000000..2d5a21a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+
+public class TsFileResourceManager {
+  private static final Logger logger = 
LoggerFactory.getLogger(TsFileResourceManager.class);
+
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  /** threshold total memory for all TimeIndex */
+  private double TIME_INDEX_MEMORY_THRESHOLD =
+      CONFIG.getAllocateMemoryForRead() * 
CONFIG.getTimeIndexMemoryProportion();
+
+  /** store the sealed TsFileResource, sorted by priority of TimeIndex */
+  private final TreeSet<TsFileResource> sealedTsFileResources =
+      new TreeSet<>(TsFileResource::compareIndexDegradePriority);
+
+  /** total used memory for TimeIndex */
+  private long totalTimeIndexMemCost;
+
+  @TestOnly
+  public void setTimeIndexMemoryThreshold(double timeIndexMemoryThreshold) {
+    TIME_INDEX_MEMORY_THRESHOLD = timeIndexMemoryThreshold;
+  }
+
+  @TestOnly
+  public long getPriorityQueueSize() {
+    return sealedTsFileResources.size();
+  }
+
+  /**
+   * add the closed TsFileResource into priorityQueue and increase memory cost 
of timeIndex, once
+   * memory cost is larger than threshold, degradation is triggered.
+   */
+  public synchronized void registerSealedTsFileResource(TsFileResource 
tsFileResource) {
+    sealedTsFileResources.add(tsFileResource);
+    totalTimeIndexMemCost += tsFileResource.calculateRamSize();
+    chooseTsFileResourceToDegrade();
+  }
+
+  /** delete the TsFileResource in PriorityQueue when the source file is 
deleted */
+  public synchronized void removeTsFileResource(TsFileResource tsFileResource) 
{
+    sealedTsFileResources.remove(tsFileResource);
+    if (TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType())
+        == TimeIndexLevel.FILE_TIME_INDEX) {
+      totalTimeIndexMemCost -= tsFileResource.calculateRamSize();
+    } else {
+      totalTimeIndexMemCost -= tsFileResource.getRamSize();
+    }
+  }
+
+  /** once degradation is triggered, the total memory for timeIndex should 
reduce */
+  private void releaseTimeIndexMemCost(long memCost) {
+    totalTimeIndexMemCost -= memCost;
+  }
+
+  /**
+   * choose the top TsFileResource in priorityQueue to degrade until the 
memory is smaller than
+   * threshold.
+   */
+  private void chooseTsFileResourceToDegrade() {
+    while (totalTimeIndexMemCost > TIME_INDEX_MEMORY_THRESHOLD) {
+      TsFileResource tsFileResource = sealedTsFileResources.pollFirst();
+      if (tsFileResource == null
+          || TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType())
+              == TimeIndexLevel.FILE_TIME_INDEX) {
+        logger.error("Can't degrade any more");
+        throw new RuntimeException("Can't degrade any more");
+      }
+      long memoryReduce = tsFileResource.degradeTimeIndex();
+      releaseTimeIndexMemCost(memoryReduce);
+      // add the polled tsFileResource to the priority queue
+      sealedTsFileResources.add(tsFileResource);
+    }
+  }
+
+  /** function for clearing TsFileManager */
+  public synchronized void clear() {
+    if (this.sealedTsFileResources != null) {
+      this.sealedTsFileResources.clear();
+    }
+    this.totalTimeIndexMemCost = 0;
+  }
+
+  public static TsFileResourceManager getInstance() {
+    return TsFileResourceManager.InstanceHolder.instance;
+  }
+
+  private static class InstanceHolder {
+    private InstanceHolder() {}
+
+    private static TsFileResourceManager instance = new 
TsFileResourceManager();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
new file mode 100644
index 0000000..7910824
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.constant.TestConstant.TIMESTAMP_STR;
+import static org.junit.Assert.*;
+
+public class IoTDBManageTsFileResourceIT {
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private TsFileResourceManager tsFileResourceManager = 
TsFileResourceManager.getInstance();
+  private double prevTimeIndexMemoryProportion;
+  private double prevTimeIndexMemoryThreshold;
+  private CompactionStrategy prevTsFileManagementStrategy;
+
+  private static String[] unSeqSQLs =
+      new String[] {
+        "insert into root.sg1.d1(time,s1) values(1, 1)",
+        "insert into root.sg1.d1(time,s2) values(2, 2)",
+        "flush",
+        "insert into root.sg1.d1(time,s1) values(9, 9)",
+        "insert into root.sg1.d1(time,s2) values(10, 10)",
+        "flush",
+        "insert into root.sg1.d1(time,s1) values(5, 5)",
+        "insert into root.sg1.d1(time,s2) values(6, 6)",
+        "flush",
+        "insert into root.sg1.d2(time,s1) values(11, 11)",
+        "insert into root.sg1.d2(time,s2) values(12, 12)",
+        "flush",
+        "insert into root.sg1.d1(time,s1) values(13, 13)",
+        "insert into root.sg1.d1(time,s2) values(14, 14)",
+        "flush",
+        "insert into root.sg1.d2(time,s1) values(7, 7)",
+        "insert into root.sg1.d2(time,s2) values(8, 8)",
+        "flush",
+        "insert into root.sg1.d2(time,s1) values(3, 3)",
+        "insert into root.sg1.d2(time,s2) values(4, 4)",
+        "flush",
+        "insert into root.sg1.d2(time,s1) values(15, 15)",
+        "insert into root.sg1.d2(time,s2) values(16, 16)",
+        "flush"
+      };
+
+  @Before
+  public void setUp() throws ClassNotFoundException {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    prevTimeIndexMemoryProportion = CONFIG.getTimeIndexMemoryProportion();
+    prevTsFileManagementStrategy = CONFIG.getCompactionStrategy();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    prevTimeIndexMemoryThreshold =
+        prevTimeIndexMemoryProportion * CONFIG.getAllocateMemoryForRead();
+    
tsFileResourceManager.setTimeIndexMemoryThreshold(prevTimeIndexMemoryThreshold);
+    CONFIG.setCompactionStrategy(prevTsFileManagementStrategy);
+  }
+
+  @Test
+  public void multiResourceTest() throws SQLException {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      CONFIG.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+      double curTimeIndexMemoryThreshold = 1288.5;
+      
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+      for (String sql : unSeqSQLs) {
+        statement.execute(sql);
+      }
+      statement.close();
+      List<TsFileResource> seqResources =
+          new ArrayList<>(
+              StorageEngine.getInstance()
+                  .getProcessor(new PartialPath("root.sg1"))
+                  .getSequenceFileTreeSet());
+      assertEquals(5, seqResources.size());
+      // five tsFileResource are degraded in total, 2 are in seqResources and 
3 are in
+      // unSeqResources
+      for (int i = 0; i < seqResources.size(); i++) {
+        if (i < 2) {
+          assertEquals(
+              TimeIndexLevel.FILE_TIME_INDEX,
+              TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+        } else {
+          assertEquals(
+              TimeIndexLevel.DEVICE_TIME_INDEX,
+              TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+        }
+      }
+      List<TsFileResource> unSeqResources =
+          new ArrayList<>(
+              StorageEngine.getInstance()
+                  .getProcessor(new PartialPath("root.sg1"))
+                  .getUnSequenceFileList());
+      assertEquals(3, unSeqResources.size());
+      for (TsFileResource resource : unSeqResources) {
+        assertEquals(
+            TimeIndexLevel.FILE_TIME_INDEX, 
TimeIndexLevel.valueOf(resource.getTimeIndexType()));
+      }
+    } catch (StorageEngineException | IllegalPathException e) {
+      Assert.fail();
+    }
+
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet = statement.execute("SELECT s1 FROM root.sg1.d1");
+      assertTrue(hasResultSet);
+      String[] exp = new String[] {"1,1.0", "5,5.0", "9,9.0", "13,13.0"};
+      int cnt = 0;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String result = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(2);
+          assertEquals(exp[cnt], result);
+          cnt++;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void oneResourceTest() throws SQLException {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      double curTimeIndexMemoryThreshold = 290;
+      
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+      statement.execute("insert into root.sg1.wf01.wt01(timestamp, status) 
values (1000, true)");
+      statement.execute("insert into root.sg1.wf01.wt01(timestamp, status) 
values (2000, true)");
+      statement.execute("insert into root.sg1.wf01.wt01(timestamp, status) 
values (3000, true)");
+      statement.execute("flush");
+      statement.close();
+      List<TsFileResource> resources =
+          new ArrayList<>(
+              StorageEngine.getInstance()
+                  .getProcessor(new PartialPath("root.sg1"))
+                  .getSequenceFileTreeSet());
+      assertEquals(1, resources.size());
+      for (TsFileResource resource : resources) {
+        assertEquals(
+            TimeIndexLevel.FILE_TIME_INDEX, 
TimeIndexLevel.valueOf(resource.getTimeIndexType()));
+      }
+    } catch (StorageEngineException | IllegalPathException e) {
+      Assert.fail();
+    }
+  }
+
+  @Test
+  public void restartResourceTest()
+      throws SQLException, IllegalPathException, StorageEngineException {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      CONFIG.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+      double curTimeIndexMemoryThreshold = 1288.5;
+      
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+      for (int i = 0; i < unSeqSQLs.length - 1; i++) {
+        statement.execute(unSeqSQLs[i]);
+      }
+      statement.close();
+      List<TsFileResource> seqResources =
+          new ArrayList<>(
+              StorageEngine.getInstance()
+                  .getProcessor(new PartialPath("root.sg1"))
+                  .getSequenceFileTreeSet());
+      assertEquals(5, seqResources.size());
+
+      // Four tsFileResource are degraded in total, 1 are in seqResources and 
3 are in
+      // unSeqResources. The difference with the multiResourceTest is that 
last tsFileResource is
+      // not close, so degrade method can't be called.
+      for (int i = 0; i < seqResources.size(); i++) {
+        if (i < 4) {
+          assertTrue(seqResources.get(i).isClosed());
+        } else {
+          assertFalse(seqResources.get(i).isClosed());
+        }
+        if (i < 1) {
+          assertEquals(
+              TimeIndexLevel.FILE_TIME_INDEX,
+              TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+        } else {
+          assertEquals(
+              TimeIndexLevel.DEVICE_TIME_INDEX,
+              TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+        }
+      }
+      List<TsFileResource> unSeqResources =
+          new ArrayList<>(
+              StorageEngine.getInstance()
+                  .getProcessor(new PartialPath("root.sg1"))
+                  .getUnSequenceFileList());
+      assertEquals(3, unSeqResources.size());
+      for (TsFileResource resource : unSeqResources) {
+        assertTrue(resource.isClosed());
+        assertEquals(
+            TimeIndexLevel.FILE_TIME_INDEX, 
TimeIndexLevel.valueOf(resource.getTimeIndexType()));
+      }
+    }
+
+    try {
+      EnvironmentUtils.restartDaemon();
+    } catch (Exception e) {
+      Assert.fail();
+    }
+    List<TsFileResource> seqResources =
+        new ArrayList<>(
+            StorageEngine.getInstance()
+                .getProcessor(new PartialPath("root.sg1"))
+                .getSequenceFileTreeSet());
+    assertEquals(5, seqResources.size());
+    for (int i = 0; i < seqResources.size(); i++) {
+      assertTrue(seqResources.get(i).isClosed());
+    }
+    List<TsFileResource> unSeqResources =
+        new ArrayList<>(
+            StorageEngine.getInstance()
+                .getProcessor(new PartialPath("root.sg1"))
+                .getUnSequenceFileList());
+    assertEquals(3, unSeqResources.size());
+    for (TsFileResource resource : unSeqResources) {
+      assertEquals(
+          TimeIndexLevel.FILE_TIME_INDEX, 
TimeIndexLevel.valueOf(resource.getTimeIndexType()));
+      assertTrue(resource.isClosed());
+    }
+
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet = statement.execute("SELECT s1 FROM root.sg1.d1");
+      assertTrue(hasResultSet);
+      String[] exp = new String[] {"1,1.0", "5,5.0", "9,9.0", "13,13.0"};
+      int cnt = 0;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String result = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(2);
+          assertEquals(exp[cnt], result);
+          cnt++;
+        }
+      }
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java 
b/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
new file mode 100644
index 0000000..538d398
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+
+public class ResourceManagerTest {
+
+  static final String RESOURCE_MANAGER_TEST_SG = "root.resourceManagerTest";
+  private int seqFileNum = 10;
+  private int measurementNum = 10;
+  int deviceNum = 10;
+  long ptNum = 100;
+  long flushInterval = 20;
+  TSEncoding encoding = TSEncoding.PLAIN;
+
+  String[] deviceIds;
+  MeasurementSchema[] measurementSchemas;
+
+  List<TsFileResource> seqResources = new ArrayList<>();
+  List<TsFileResource> unseqResources = new ArrayList<>();
+
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private TsFileResourceManager tsFileResourceManager = 
TsFileResourceManager.getInstance();;
+  private double prevTimeIndexMemoryProportion;
+  private double prevTimeIndexMemoryThreshold;
+  private TimeIndexLevel timeIndexLevel;
+
+  @Before
+  public void setUp() throws IOException, WriteProcessException, 
MetadataException {
+    IoTDB.metaManager.init();
+    prevTimeIndexMemoryProportion = CONFIG.getTimeIndexMemoryProportion();
+    timeIndexLevel = CONFIG.getTimeIndexLevel();
+    prepareSeries();
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    removeFiles();
+    seqResources.clear();
+    unseqResources.clear();
+    CONFIG.setTimeIndexMemoryProportion(prevTimeIndexMemoryProportion);
+    CONFIG.setTimeIndexLevel(String.valueOf(timeIndexLevel));
+    prevTimeIndexMemoryThreshold =
+        prevTimeIndexMemoryProportion * CONFIG.getAllocateMemoryForRead();
+    
tsFileResourceManager.setTimeIndexMemoryThreshold(prevTimeIndexMemoryThreshold);
+    ChunkCache.getInstance().clear();
+    TimeSeriesMetadataCache.getInstance().clear();
+    IoTDB.metaManager.clear();
+    TsFileResourceManager.getInstance().clear();
+    EnvironmentUtils.cleanAllDir();
+  }
+
+  void prepareSeries() throws MetadataException {
+    measurementSchemas = new MeasurementSchema[measurementNum];
+    for (int i = 0; i < measurementNum; i++) {
+      measurementSchemas[i] =
+          new MeasurementSchema(
+              "sensor" + i, TSDataType.DOUBLE, encoding, 
CompressionType.UNCOMPRESSED);
+    }
+    deviceIds = new String[deviceNum];
+    for (int i = 0; i < deviceNum; i++) {
+      deviceIds[i] = RESOURCE_MANAGER_TEST_SG + PATH_SEPARATOR + "device" + i;
+    }
+    IoTDB.metaManager.setStorageGroup(new 
PartialPath(RESOURCE_MANAGER_TEST_SG));
+    for (String device : deviceIds) {
+      for (MeasurementSchema measurementSchema : measurementSchemas) {
+        PartialPath devicePath = new PartialPath(device);
+        IoTDB.metaManager.createTimeseries(
+            devicePath.concatNode(measurementSchema.getMeasurementId()),
+            measurementSchema.getType(),
+            measurementSchema.getEncodingType(),
+            measurementSchema.getCompressor(),
+            Collections.emptyMap());
+      }
+    }
+  }
+
+  private void removeFiles() throws IOException {
+    for (TsFileResource tsFileResource : seqResources) {
+      if (tsFileResource.getTsFile().exists()) {
+        tsFileResource.remove();
+      }
+    }
+    for (TsFileResource tsFileResource : unseqResources) {
+      if (tsFileResource.getTsFile().exists()) {
+        tsFileResource.remove();
+      }
+    }
+    File[] files = 
FSFactoryProducer.getFSFactory().listFilesBySuffix("target", ".tsfile");
+    for (File file : files) {
+      file.delete();
+    }
+    File[] resourceFiles =
+        FSFactoryProducer.getFSFactory().listFilesBySuffix("target", 
".resource");
+    for (File resourceFile : resourceFiles) {
+      resourceFile.delete();
+    }
+    FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+    FileReaderManager.getInstance().stop();
+  }
+
+  void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, 
long valueOffset)
+      throws IOException, WriteProcessException {
+    TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile());
+    for (String deviceId : deviceIds) {
+      for (MeasurementSchema measurementSchema : measurementSchemas) {
+        fileWriter.registerTimeseries(
+            new Path(deviceId, measurementSchema.getMeasurementId()), 
measurementSchema);
+      }
+    }
+    for (long i = timeOffset; i < timeOffset + ptNum; i++) {
+      for (int j = 0; j < deviceNum; j++) {
+        TSRecord record = new TSRecord(i, deviceIds[j]);
+        for (int k = 0; k < measurementNum; k++) {
+          record.addTuple(
+              DataPoint.getDataPoint(
+                  measurementSchemas[k].getType(),
+                  measurementSchemas[k].getMeasurementId(),
+                  String.valueOf(i + valueOffset)));
+        }
+        fileWriter.write(record);
+        tsFileResource.updateStartTime(deviceIds[j], i);
+        tsFileResource.updateEndTime(deviceIds[j], i);
+      }
+      if ((i + 1) % flushInterval == 0) {
+        fileWriter.flushAllChunkGroups();
+      }
+    }
+    fileWriter.close();
+  }
+
+  @Test
+  public void testDegradeMethod() throws IOException, WriteProcessException {
+    File file =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource = new TsFileResource(file);
+    tsFileResource.setClosed(true);
+    tsFileResource.updatePlanIndexes((long) 0);
+    prepareFile(tsFileResource, 0, ptNum, 0);
+    long previousRamSize = tsFileResource.calculateRamSize();
+    assertEquals(
+        TimeIndexLevel.DEVICE_TIME_INDEX,
+        TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+    long reducedMemory = tsFileResource.degradeTimeIndex();
+    assertEquals(previousRamSize - tsFileResource.calculateRamSize(), 
reducedMemory);
+    assertEquals(
+        TimeIndexLevel.FILE_TIME_INDEX, 
TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+  }
+
+  @Test
+  public void testDegradeToFileTimeIndex() throws IOException, 
WriteProcessException {
+    File file =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource = new TsFileResource(file);
+    tsFileResource.setClosed(true);
+    tsFileResource.updatePlanIndexes((long) 0);
+    prepareFile(tsFileResource, 0, ptNum, 0);
+    assertEquals(
+        TimeIndexLevel.DEVICE_TIME_INDEX,
+        TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+    double curTimeIndexMemoryThreshold = 322;
+    
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+    tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+    assertEquals(
+        TimeIndexLevel.FILE_TIME_INDEX, 
TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+  }
+
+  @Test
+  public void testNotDegradeToFileTimeIndex() throws IOException, 
WriteProcessException {
+    File file =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource = new TsFileResource(file);
+    tsFileResource.setClosed(true);
+    tsFileResource.updatePlanIndexes((long) 0);
+    prepareFile(tsFileResource, 0, ptNum, 0);
+    assertEquals(
+        TimeIndexLevel.DEVICE_TIME_INDEX,
+        TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+    long previousRamSize = tsFileResource.calculateRamSize();
+    double curTimeIndexMemoryThreshold = 3221;
+    
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+    tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+    assertEquals(0, previousRamSize - tsFileResource.calculateRamSize());
+    assertEquals(
+        TimeIndexLevel.DEVICE_TIME_INDEX,
+        TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+  }
+
+  @Test
+  public void testTwoResourceToDegrade() throws IOException, 
WriteProcessException {
+    File file1 =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource1 = new TsFileResource(file1);
+    tsFileResource1.setClosed(true);
+    tsFileResource1.updatePlanIndexes((long) 0);
+    prepareFile(tsFileResource1, 0, ptNum, 0);
+    assertEquals(
+        TimeIndexLevel.DEVICE_TIME_INDEX,
+        TimeIndexLevel.valueOf(tsFileResource1.getTimeIndexType()));
+    double curTimeIndexMemoryThreshold = 3221;
+    
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+    tsFileResourceManager.registerSealedTsFileResource(tsFileResource1);
+    assertEquals(
+        TimeIndexLevel.DEVICE_TIME_INDEX,
+        TimeIndexLevel.valueOf(tsFileResource1.getTimeIndexType()));
+    File file2 =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource2 = new TsFileResource(file2);
+    tsFileResource2.setClosed(true);
+    tsFileResource2.updatePlanIndexes((long) 1);
+    prepareFile(tsFileResource2, ptNum, ptNum, 0);
+    assertEquals(
+        TimeIndexLevel.DEVICE_TIME_INDEX,
+        TimeIndexLevel.valueOf(tsFileResource2.getTimeIndexType()));
+    tsFileResourceManager.registerSealedTsFileResource(tsFileResource2);
+    assertEquals(
+        TimeIndexLevel.FILE_TIME_INDEX, 
TimeIndexLevel.valueOf(tsFileResource1.getTimeIndexType()));
+    assertEquals(
+        TimeIndexLevel.DEVICE_TIME_INDEX,
+        TimeIndexLevel.valueOf(tsFileResource2.getTimeIndexType()));
+  }
+
+  @Test
+  public void testMultiDeviceTimeIndexDegrade() throws IOException, 
WriteProcessException {
+    double curTimeIndexMemoryThreshold = 9663.7;
+    
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+    for (int i = 0; i < seqFileNum; i++) {
+      File file =
+          new File(
+              TestConstant.BASE_OUTPUT_PATH.concat(
+                  i
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + i
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + IoTDBConstant.FILE_NAME_SEPARATOR
+                      + 0
+                      + ".tsfile"));
+      TsFileResource tsFileResource = new TsFileResource(file);
+      tsFileResource.setClosed(true);
+      tsFileResource.updatePlanIndexes((long) i);
+      assertEquals(
+          TimeIndexLevel.DEVICE_TIME_INDEX,
+          TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+      seqResources.add(tsFileResource);
+      prepareFile(tsFileResource, i * ptNum, ptNum, 0);
+      tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+    }
+    assertEquals(10, tsFileResourceManager.getPriorityQueueSize());
+    for (int i = 0; i < seqFileNum; i++) {
+      if (i < 7) {
+        assertEquals(
+            TimeIndexLevel.FILE_TIME_INDEX,
+            TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+      } else {
+        assertEquals(
+            TimeIndexLevel.DEVICE_TIME_INDEX,
+            TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+      }
+    }
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testAllFileTimeIndexDegrade() throws IOException, 
WriteProcessException {
+    long reducedMemory = 0;
+    CONFIG.setTimeIndexLevel(String.valueOf(TimeIndexLevel.FILE_TIME_INDEX));
+    double curTimeIndexMemoryThreshold = 322;
+    
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+    try {
+      for (int i = 0; i < seqFileNum; i++) {
+        File file =
+            new File(
+                TestConstant.BASE_OUTPUT_PATH.concat(
+                    i
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + i
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 0
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 0
+                        + ".tsfile"));
+        TsFileResource tsFileResource = new TsFileResource(file);
+        tsFileResource.setClosed(true);
+        tsFileResource.updatePlanIndexes((long) i);
+        seqResources.add(tsFileResource);
+        assertEquals(
+            TimeIndexLevel.FILE_TIME_INDEX,
+            TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+        long previousRamSize = tsFileResource.calculateRamSize();
+        prepareFile(tsFileResource, i * ptNum, ptNum, 0);
+        tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+        assertEquals(
+            TimeIndexLevel.FILE_TIME_INDEX,
+            TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+        reducedMemory = previousRamSize - tsFileResource.calculateRamSize();
+      }
+    } catch (RuntimeException e) {
+      assertEquals(0, reducedMemory);
+      assertEquals(7, seqResources.size());
+      throw e;
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java 
b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index f058a87..5caab07 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -43,6 +43,7 @@ import 
org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
 import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.rpc.TSocketWrapper;
@@ -161,6 +162,9 @@ public class EnvironmentUtils {
     // clear memtable manager info
     MemTableManager.getInstance().close();
 
+    // clear tsFileResource manager info
+    TsFileResourceManager.getInstance().clear();
+
     // delete all directory
     cleanAllDir();
     config.setSeqTsFileSize(oldSeqTsFileSize);
@@ -302,6 +306,7 @@ public class EnvironmentUtils {
     shutdownDaemon();
     stopDaemon();
     IoTDB.metaManager.clear();
+    TsFileResourceManager.getInstance().clear();
     reactiveDaemon();
   }
 

Reply via email to