This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 96ee5a8 [IOTDB-1372]Enhance management of TsFileResource (#4003)
96ee5a8 is described below
commit 96ee5a83fb05e5b9f72683125ec7e376c7eff258
Author: Yuting Yan <[email protected]>
AuthorDate: Sat Oct 9 01:15:17 2021 -0500
[IOTDB-1372]Enhance management of TsFileResource (#4003)
---
pom.xml | 2 +-
.../resources/conf/iotdb-engine.properties | 5 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +
.../level/LevelCompactionTsFileManagement.java | 15 +
.../no/NoCompactionTsFileManagement.java | 8 +
.../engine/storagegroup/StorageGroupProcessor.java | 121 ++++---
.../db/engine/storagegroup/TsFileResource.java | 36 +-
.../storagegroup/timeindex/DeviceTimeIndex.java | 68 +++-
.../storagegroup/timeindex/FileTimeIndex.java | 22 ++
.../engine/storagegroup/timeindex/ITimeIndex.java | 24 ++
.../iotdb/db/rescon/TsFileResourceManager.java | 121 +++++++
.../integration/IoTDBManageTsFileResourceIT.java | 291 +++++++++++++++
.../iotdb/db/rescon/ResourceManagerTest.java | 401 +++++++++++++++++++++
.../apache/iotdb/db/utils/EnvironmentUtils.java | 5 +
15 files changed, 1063 insertions(+), 75 deletions(-)
diff --git a/pom.xml b/pom.xml
index 7330d54..3b3fe13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1083,7 +1083,7 @@
</activation>
<properties>
<maven.compiler.release>8</maven.compiler.release>
- <argLine>--illegal-access=permit
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.t [...]
+ <argLine>--illegal-access=permit
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.java [...]
</properties>
</profile>
<!--
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index f0745fb..31b1bc8 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -359,6 +359,10 @@ timestamp_precision=ms
# Datatype: double
# flush_proportion=0.4
+# Ratio of read memory allocated for timeIndex, 0.2 by default
+# Datatype: double
+# time_index_memory_proportion=0.2
+
# Ratio of write memory allocated for buffered arrays, 0.6 by default
# Datatype: double
# buffered_arrays_memory_proportion=0.6
@@ -405,7 +409,6 @@ timestamp_precision=ms
# Datatype: int
# upgrade_thread_num=1
-
####################
### Query Configurations
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7ab0170..d3b218e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -125,6 +125,9 @@ public class IoTDBConfig {
/** Ratio of memory allocated for buffered arrays */
private double bufferedArraysMemoryProportion = 0.6;
+ /** Memory allocated proportion for timeIndex */
+ private double timeIndexMemoryProportion = 0.2;
+
/** Flush proportion for system */
private double flushProportion = 0.4;
@@ -1382,6 +1385,14 @@ public class IoTDBConfig {
this.bufferedArraysMemoryProportion = bufferedArraysMemoryProportion;
}
+ public double getTimeIndexMemoryProportion() {
+ return timeIndexMemoryProportion;
+ }
+
+ public void setTimeIndexMemoryProportion(double timeIndexMemoryProportion) {
+ this.timeIndexMemoryProportion = timeIndexMemoryProportion;
+ }
+
public double getFlushProportion() {
return flushProportion;
}
@@ -1422,7 +1433,7 @@ public class IoTDBConfig {
this.allocateMemoryForSchema = allocateMemoryForSchema;
}
- long getAllocateMemoryForRead() {
+ public long getAllocateMemoryForRead() {
return allocateMemoryForRead;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f6adf66..849625a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -175,6 +175,12 @@ public class IoTDBDescriptor {
"buffered_arrays_memory_proportion",
Double.toString(conf.getBufferedArraysMemoryProportion()))));
+ conf.setTimeIndexMemoryProportion(
+ Double.parseDouble(
+ properties.getProperty(
+ "time_index_memory_proportion",
+ Double.toString(conf.getTimeIndexMemoryProportion()))));
+
conf.setFlushProportion(
Double.parseDouble(
properties.getProperty(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 03c14ba..f8fa374 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -89,6 +90,9 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
private final List<TsFileResource> sequenceRecoverTsFileResources = new
ArrayList<>();
private final List<TsFileResource> unSequenceRecoverTsFileResources = new
ArrayList<>();
+ /** manage TsFileResource degrade */
+ private TsFileResourceManager tsFileResourceManager =
TsFileResourceManager.getInstance();
+
public LevelCompactionTsFileManagement(String storageGroupName, String
storageGroupDir) {
super(storageGroupName, storageGroupDir);
clear();
@@ -149,6 +153,9 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
}
}
}
+ for (TsFileResource tsFileResource : mergeTsFiles) {
+ tsFileResourceManager.removeTsFileResource(tsFileResource);
+ }
}
private void deleteLevelFile(TsFileResource seqFile) {
@@ -238,6 +245,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
} finally {
writeUnlock();
}
+ tsFileResourceManager.removeTsFileResource(tsFileResource);
}
@Override
@@ -262,6 +270,9 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
} finally {
writeUnlock();
}
+ for (TsFileResource tsFileResource : tsFileResourceList) {
+ tsFileResourceManager.removeTsFileResource(tsFileResource);
+ }
}
@Override
@@ -475,6 +486,8 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
}
// complete compaction and delete source file
deleteAllSubLevelFiles(isSeq, timePartition);
+
+
tsFileResourceManager.registerSealedTsFileResource(targetTsFileResource);
} else {
// get tsfile resource from list, as they have been recovered in
StorageGroupProcessor
TsFileResource targetResource = getRecoverTsFileResource(targetFile,
isSeq);
@@ -519,6 +532,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
unSequenceRecoverTsFileResources.clear();
}
deleteLevelFilesInList(timePartition, sourceTsFileResources,
level, isSeq);
+
tsFileResourceManager.registerSealedTsFileResource(targetResource);
} finally {
writeUnlock();
}
@@ -712,6 +726,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
unSequenceTsFileResources.get(timePartition).get(i +
1).add(newResource);
}
deleteLevelFilesInList(timePartition, toMergeTsFiles, i,
sequence);
+ tsFileResourceManager.registerSealedTsFileResource(newResource);
if (mergeResources.size() > i + 1) {
mergeResources.get(i + 1).add(newResource);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 5c3d8b1..83581d1 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.no;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +44,9 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
// includes sealed and unsealed unSequence TsFiles
private final Map<Long, List<TsFileResource>> unSequenceFileListMap = new
TreeMap<>();
+ /** manage TsFileResource degrade */
+ private TsFileResourceManager tsFileResourceManager =
TsFileResourceManager.getInstance();
+
public NoCompactionTsFileManagement(String storageGroupName, String
storageGroupDir) {
super(storageGroupName, storageGroupDir);
}
@@ -110,6 +114,7 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
} finally {
writeUnlock();
}
+ tsFileResourceManager.removeTsFileResource(tsFileResource);
}
@Override
@@ -155,6 +160,9 @@ public class NoCompactionTsFileManagement extends
TsFileManagement {
} finally {
writeUnlock();
}
+ for (TsFileResource tsFileResource : tsFileResourceList) {
+ tsFileResourceManager.removeTsFileResource(tsFileResource);
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 985fb8b..b5d1363 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -63,6 +63,7 @@ import
org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryFileManager;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.utils.MmapUtil;
@@ -227,6 +228,10 @@ public class StorageGroupProcessor {
/** manage seqFileList and unSeqFileList */
private TsFileManagement tsFileManagement;
+
+ /** manage tsFileResource degrade */
+ private TsFileResourceManager tsFileResourceManager =
TsFileResourceManager.getInstance();
+
/**
* time partition id -> version controller which assigns a version for each
MemTable and
* deletion/update such that after they are persisted, the order of
insertions, deletions and
@@ -761,77 +766,80 @@ public class StorageGroupProcessor {
if (writer.hasCrashed()) {
tsFileManagement.addRecover(tsFileResource, isSeq);
} else {
- tsFileResource.setClosed(true);
+ tsFileResource.close();
tsFileManagement.add(tsFileResource, isSeq);
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
}
continue;
} else {
writer =
recoverPerformer.recover(true, this::getWalDirectByteBuffer,
this::releaseWalBuffer);
}
- } catch (StorageGroupProcessorException e) {
- logger.warn(
- "Skip TsFile: {} because of error in recover: ",
tsFileResource.getTsFilePath(), e);
- continue;
- }
- if (i != tsFiles.size() - 1 || !writer.canWrite()) {
- // not the last file or cannot write, just close it
- tsFileResource.setClosed(true);
- } else if (writer.canWrite()) {
- // the last file is not closed, continue writing to in
- TsFileProcessor tsFileProcessor;
- if (isSeq) {
- tsFileProcessor =
- new TsFileProcessor(
- virtualStorageGroupId,
- storageGroupInfo,
- tsFileResource,
- this::closeUnsealedTsFileProcessorCallBack,
- this::updateLatestFlushTimeCallback,
- true,
- writer);
- if (enableMemControl) {
- TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
- tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
- this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
+ if (i != tsFiles.size() - 1 || !writer.canWrite()) {
+ // not the last file or cannot write, just close it
+ tsFileResource.close();
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+ } else if (writer.canWrite()) {
+ // the last file is not closed, continue writing to in
+ TsFileProcessor tsFileProcessor;
+ if (isSeq) {
+ tsFileProcessor =
+ new TsFileProcessor(
+ virtualStorageGroupId,
+ storageGroupInfo,
+ tsFileResource,
+ this::closeUnsealedTsFileProcessorCallBack,
+ this::updateLatestFlushTimeCallback,
+ true,
+ writer);
+ if (enableMemControl) {
+ TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
+ tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
+ this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
+ }
+ workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
+ } else {
+ tsFileProcessor =
+ new TsFileProcessor(
+ virtualStorageGroupId,
+ storageGroupInfo,
+ tsFileResource,
+ this::closeUnsealedTsFileProcessorCallBack,
+ this::unsequenceFlushCallback,
+ false,
+ writer);
+ if (enableMemControl) {
+ TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
+ tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
+ this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
+ }
+ workUnsequenceTsFileProcessors.put(timePartitionId,
tsFileProcessor);
}
- workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
- } else {
- tsFileProcessor =
- new TsFileProcessor(
- virtualStorageGroupId,
- storageGroupInfo,
- tsFileResource,
- this::closeUnsealedTsFileProcessorCallBack,
- this::unsequenceFlushCallback,
- false,
- writer);
+ tsFileResource.setProcessor(tsFileProcessor);
+ tsFileResource.removeResourceFile();
+ tsFileProcessor.setTimeRangeId(timePartitionId);
+ writer.makeMetadataVisible();
if (enableMemControl) {
- TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
- tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
- this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
- }
- workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
- }
- tsFileResource.setProcessor(tsFileProcessor);
- tsFileResource.removeResourceFile();
- tsFileProcessor.setTimeRangeId(timePartitionId);
- writer.makeMetadataVisible();
- if (enableMemControl) {
- // get chunkMetadata size
- long chunkMetadataSize = 0;
- for (Map<String, List<ChunkMetadata>> metaMap :
writer.getMetadatasForQuery().values()) {
- for (List<ChunkMetadata> metadatas : metaMap.values()) {
- for (ChunkMetadata chunkMetadata : metadatas) {
- chunkMetadataSize += chunkMetadata.calculateRamSize();
+ // get chunkMetadata size
+ long chunkMetadataSize = 0;
+ for (Map<String, List<ChunkMetadata>> metaMap :
+ writer.getMetadatasForQuery().values()) {
+ for (List<ChunkMetadata> metadatas : metaMap.values()) {
+ for (ChunkMetadata chunkMetadata : metadatas) {
+ chunkMetadataSize += chunkMetadata.calculateRamSize();
+ }
}
}
+
tsFileProcessor.getTsFileProcessorInfo().addTSPMemCost(chunkMetadataSize);
}
-
tsFileProcessor.getTsFileProcessorInfo().addTSPMemCost(chunkMetadataSize);
}
+ tsFileManagement.add(tsFileResource, isSeq);
+ } catch (StorageGroupProcessorException | IOException e) {
+ logger.warn(
+ "Skip TsFile: {} because of error in recover: ",
tsFileResource.getTsFilePath(), e);
+ continue;
}
- tsFileManagement.add(tsFileResource, isSeq);
}
}
@@ -2175,6 +2183,7 @@ public class StorageGroupProcessor {
closeQueryLock.writeLock().lock();
try {
tsFileProcessor.close();
+
tsFileResourceManager.registerSealedTsFileResource(tsFileProcessor.getTsFileResource());
} finally {
closeQueryLock.writeLock().unlock();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 46be2b9..5bc2070 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpgradeTsFileResourceCallBack;
import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.FileTimeIndex;
import org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex;
import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
@@ -155,6 +156,9 @@ public class TsFileResource {
private long version = 0;
+ /** memory cost for the TsFileResource when it's calculated for the first
time */
+ private long ramSize;
+
public TsFileResource() {}
public TsFileResource(TsFileResource other) throws IOException {
@@ -774,7 +778,12 @@ public class TsFileResource {
/** @return resource map size */
public long calculateRamSize() {
- return timeIndex.calculateRamSize();
+ ramSize = timeIndex.calculateRamSize();
+ return ramSize;
+ }
+
+ public long getRamSize() {
+ return ramSize;
}
public void delete() throws IOException {
@@ -848,8 +857,31 @@ public class TsFileResource {
this.timeIndex = timeIndex;
}
- // change tsFile name
+ public byte getTimeIndexType() {
+ return timeIndexType;
+ }
+ public int compareIndexDegradePriority(TsFileResource tsFileResource) {
+ int cmp = timeIndex.compareDegradePriority(tsFileResource.timeIndex);
+ return cmp == 0 ?
file.getAbsolutePath().compareTo(tsFileResource.file.getAbsolutePath()) : cmp;
+ }
+
+ /** the DeviceTimeIndex degrade to FileTimeIndex and release memory */
+ public long degradeTimeIndex() {
+ TimeIndexLevel timeIndexLevel = TimeIndexLevel.valueOf(timeIndexType);
+ // if current timeIndex is FileTimeIndex, no need to degrade
+ if (timeIndexLevel == TimeIndexLevel.FILE_TIME_INDEX) return 0;
+ // get the minimum startTime
+ long startTime = timeIndex.getMinStartTime();
+ // get the maximum endTime
+ long endTime = timeIndex.getMaxEndTime();
+ // replace the DeviceTimeIndex with FileTimeIndex
+ timeIndex = new FileTimeIndex(startTime, endTime);
+ timeIndexType = 0;
+ return ramSize - timeIndex.calculateRamSize();
+ }
+
+ // change tsFile name
public static String getNewTsFileName(long time, long version, int mergeCnt,
int unSeqMergeCnt) {
return time
+ FILE_NAME_SEPARATOR
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index 013f111..5db2fb2 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -27,6 +27,9 @@ import org.apache.iotdb.db.utils.SerializeUtils;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -39,6 +42,8 @@ import java.util.concurrent.ConcurrentHashMap;
public class DeviceTimeIndex implements ITimeIndex {
+ private static final Logger logger =
LoggerFactory.getLogger(DeviceTimeIndex.class);
+
public static final int INIT_ARRAY_SIZE = 64;
protected static final Map<String, String> cachedDevicePool =
@@ -53,6 +58,12 @@ public class DeviceTimeIndex implements ITimeIndex {
*/
protected long[] endTimes;
+ /** min start time */
+ private long minStartTime = Long.MAX_VALUE;
+
+ /** max end time */
+ private long maxEndTime = Long.MIN_VALUE;
+
/** device -> index of start times array and end times array */
protected Map<String, Integer> deviceToIndex;
@@ -96,13 +107,15 @@ public class DeviceTimeIndex implements ITimeIndex {
@Override
public DeviceTimeIndex deserialize(InputStream inputStream) throws
IOException {
int deviceNum = ReadWriteIOUtils.readInt(inputStream);
- Map<String, Integer> deviceMap = new ConcurrentHashMap<>();
- long[] startTimesArray = new long[deviceNum];
- long[] endTimesArray = new long[deviceNum];
+
+ startTimes = new long[deviceNum];
+ endTimes = new long[deviceNum];
for (int i = 0; i < deviceNum; i++) {
- startTimesArray[i] = ReadWriteIOUtils.readLong(inputStream);
- endTimesArray[i] = ReadWriteIOUtils.readLong(inputStream);
+ startTimes[i] = ReadWriteIOUtils.readLong(inputStream);
+ endTimes[i] = ReadWriteIOUtils.readLong(inputStream);
+ minStartTime = Math.min(minStartTime, startTimes[i]);
+ maxEndTime = Math.max(maxEndTime, endTimes[i]);
}
for (int i = 0; i < deviceNum; i++) {
@@ -111,21 +124,22 @@ public class DeviceTimeIndex implements ITimeIndex {
// use the deviceId from memory instead of the deviceId read from disk
String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
int index = ReadWriteIOUtils.readInt(inputStream);
- deviceMap.put(cachedPath, index);
+ deviceToIndex.put(cachedPath, index);
}
- return new DeviceTimeIndex(deviceMap, startTimesArray, endTimesArray);
+ return this;
}
@Override
public DeviceTimeIndex deserialize(ByteBuffer buffer) {
int deviceNum = buffer.getInt();
- Map<String, Integer> deviceMap = new ConcurrentHashMap<>(deviceNum);
- long[] startTimesArray = new long[deviceNum];
- long[] endTimesArray = new long[deviceNum];
+ startTimes = new long[deviceNum];
+ endTimes = new long[deviceNum];
for (int i = 0; i < deviceNum; i++) {
- startTimesArray[i] = buffer.getLong();
- endTimesArray[i] = buffer.getLong();
+ startTimes[i] = buffer.getLong();
+ endTimes[i] = buffer.getLong();
+ minStartTime = Math.min(minStartTime, startTimes[i]);
+ maxEndTime = Math.max(maxEndTime, endTimes[i]);
}
for (int i = 0; i < deviceNum; i++) {
@@ -134,9 +148,9 @@ public class DeviceTimeIndex implements ITimeIndex {
// use the deviceId from memory instead of the deviceId read from disk
String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
int index = buffer.getInt();
- deviceMap.put(cachedPath, index);
+ deviceToIndex.put(cachedPath, index);
}
- return new DeviceTimeIndex(deviceMap, startTimesArray, endTimesArray);
+ return this;
}
@Override
@@ -263,6 +277,7 @@ public class DeviceTimeIndex implements ITimeIndex {
int index = getDeviceIndex(deviceId);
startTimes[index] = time;
}
+ minStartTime = Math.min(minStartTime, time);
}
@Override
@@ -272,18 +287,21 @@ public class DeviceTimeIndex implements ITimeIndex {
int index = getDeviceIndex(deviceId);
endTimes[index] = time;
}
+ maxEndTime = Math.max(maxEndTime, time);
}
@Override
public void putStartTime(String deviceId, long time) {
int index = getDeviceIndex(deviceId);
startTimes[index] = time;
+ minStartTime = Math.min(minStartTime, time);
}
@Override
public void putEndTime(String deviceId, long time) {
int index = getDeviceIndex(deviceId);
endTimes[index] = time;
+ maxEndTime = Math.max(maxEndTime, time);
}
@Override
@@ -306,4 +324,26 @@ public class DeviceTimeIndex implements ITimeIndex {
public boolean checkDeviceIdExist(String deviceId) {
return deviceToIndex.containsKey(deviceId);
}
+
+ @Override
+ public long getMinStartTime() {
+ return minStartTime;
+ }
+
+ @Override
+ public long getMaxEndTime() {
+ return maxEndTime;
+ }
+
+ @Override
+ public int compareDegradePriority(ITimeIndex timeIndex) {
+ if (timeIndex instanceof DeviceTimeIndex) {
+ return Long.compare(getMinStartTime(), timeIndex.getMinStartTime());
+ } else if (timeIndex instanceof FileTimeIndex) {
+ return -1;
+ } else {
+ logger.error("Wrong timeIndex type {}", timeIndex.getClass().getName());
+ throw new RuntimeException("Wrong timeIndex type " +
timeIndex.getClass().getName());
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
index d427f34..54040b8 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -173,12 +173,34 @@ public class FileTimeIndex implements ITimeIndex {
}
@Override
+ public long getMinStartTime() {
+ return startTime;
+ }
+
+ @Override
public long getEndTime(String deviceId) {
return endTime;
}
@Override
+ public long getMaxEndTime() {
+ return endTime;
+ }
+
+ @Override
public boolean checkDeviceIdExist(String deviceId) {
return true;
}
+
+ @Override
+ public int compareDegradePriority(ITimeIndex timeIndex) {
+ if (timeIndex instanceof DeviceTimeIndex) {
+ return 1;
+ } else if (timeIndex instanceof FileTimeIndex) {
+ return Long.compare(startTime, timeIndex.getMinStartTime());
+ } else {
+ logger.error("Wrong timeIndex type {}", timeIndex.getClass().getName());
+ throw new RuntimeException("Wrong timeIndex type " +
timeIndex.getClass().getName());
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index d53dd48..37fac06 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -155,4 +155,28 @@ public interface ITimeIndex {
* @return true if the deviceId may exist in TsFile, otherwise false.
*/
boolean checkDeviceIdExist(String deviceId);
+
+ /**
+ * get min start time of device
+ *
+ * @return min start time
+ */
+ long getMinStartTime();
+
+ /**
+ * get max end time of device
+ *
+ * @return max end time
+ */
+ long getMaxEndTime();
+
+ /**
+ * compare the priority of two ITimeIndex
+ *
+ * @param timeIndex another timeIndex
+ * @return value is less than 0 if the priority of this timeIndex is higher
than the argument,
+ * value is equal to 0 if the priority of this timeIndex is equal to the
argument, value is
+ * larger than 0 if the priority of this timeIndex is less than the
argument
+ */
+ int compareDegradePriority(ITimeIndex timeIndex);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
b/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
new file mode 100644
index 0000000..2d5a21a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+
+public class TsFileResourceManager {
+ private static final Logger logger =
LoggerFactory.getLogger(TsFileResourceManager.class);
+
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
+ /** threshold total memory for all TimeIndex */
+ private double TIME_INDEX_MEMORY_THRESHOLD =
+ CONFIG.getAllocateMemoryForRead() *
CONFIG.getTimeIndexMemoryProportion();
+
+ /** store the sealed TsFileResource, sorted by priority of TimeIndex */
+ private final TreeSet<TsFileResource> sealedTsFileResources =
+ new TreeSet<>(TsFileResource::compareIndexDegradePriority);
+
+ /** total used memory for TimeIndex */
+ private long totalTimeIndexMemCost;
+
+ @TestOnly
+ public void setTimeIndexMemoryThreshold(double timeIndexMemoryThreshold) {
+ TIME_INDEX_MEMORY_THRESHOLD = timeIndexMemoryThreshold;
+ }
+
+ @TestOnly
+ public long getPriorityQueueSize() {
+ return sealedTsFileResources.size();
+ }
+
+ /**
+ * add the closed TsFileResource into priorityQueue and increase memory cost
of timeIndex, once
+ * memory cost is larger than threshold, degradation is triggered.
+ */
+ public synchronized void registerSealedTsFileResource(TsFileResource
tsFileResource) {
+ sealedTsFileResources.add(tsFileResource);
+ totalTimeIndexMemCost += tsFileResource.calculateRamSize();
+ chooseTsFileResourceToDegrade();
+ }
+
+ /** delete the TsFileResource in PriorityQueue when the source file is
deleted */
+ public synchronized void removeTsFileResource(TsFileResource tsFileResource)
{
+ sealedTsFileResources.remove(tsFileResource);
+ if (TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType())
+ == TimeIndexLevel.FILE_TIME_INDEX) {
+ totalTimeIndexMemCost -= tsFileResource.calculateRamSize();
+ } else {
+ totalTimeIndexMemCost -= tsFileResource.getRamSize();
+ }
+ }
+
+ /** once degradation is triggered, the total memory for timeIndex should
reduce */
+ private void releaseTimeIndexMemCost(long memCost) {
+ totalTimeIndexMemCost -= memCost;
+ }
+
+ /**
+ * choose the top TsFileResource in priorityQueue to degrade until the
memory is smaller than
+ * threshold.
+ */
+ private void chooseTsFileResourceToDegrade() {
+ while (totalTimeIndexMemCost > TIME_INDEX_MEMORY_THRESHOLD) {
+ TsFileResource tsFileResource = sealedTsFileResources.pollFirst();
+ if (tsFileResource == null
+ || TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType())
+ == TimeIndexLevel.FILE_TIME_INDEX) {
+ logger.error("Can't degrade any more");
+ throw new RuntimeException("Can't degrade any more");
+ }
+ long memoryReduce = tsFileResource.degradeTimeIndex();
+ releaseTimeIndexMemCost(memoryReduce);
+ // add the polled tsFileResource to the priority queue
+ sealedTsFileResources.add(tsFileResource);
+ }
+ }
+
+ /** function for clearing TsFileManager */
+ public synchronized void clear() {
+ if (this.sealedTsFileResources != null) {
+ this.sealedTsFileResources.clear();
+ }
+ this.totalTimeIndexMemCost = 0;
+ }
+
+ public static TsFileResourceManager getInstance() {
+ return TsFileResourceManager.InstanceHolder.instance;
+ }
+
+ private static class InstanceHolder {
+ private InstanceHolder() {}
+
+ private static TsFileResourceManager instance = new
TsFileResourceManager();
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
new file mode 100644
index 0000000..7910824
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.constant.TestConstant.TIMESTAMP_STR;
+import static org.junit.Assert.*;
+
+public class IoTDBManageTsFileResourceIT {
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private TsFileResourceManager tsFileResourceManager =
TsFileResourceManager.getInstance();
+ private double prevTimeIndexMemoryProportion;
+ private double prevTimeIndexMemoryThreshold;
+ private CompactionStrategy prevTsFileManagementStrategy;
+
+ private static String[] unSeqSQLs =
+ new String[] {
+ "insert into root.sg1.d1(time,s1) values(1, 1)",
+ "insert into root.sg1.d1(time,s2) values(2, 2)",
+ "flush",
+ "insert into root.sg1.d1(time,s1) values(9, 9)",
+ "insert into root.sg1.d1(time,s2) values(10, 10)",
+ "flush",
+ "insert into root.sg1.d1(time,s1) values(5, 5)",
+ "insert into root.sg1.d1(time,s2) values(6, 6)",
+ "flush",
+ "insert into root.sg1.d2(time,s1) values(11, 11)",
+ "insert into root.sg1.d2(time,s2) values(12, 12)",
+ "flush",
+ "insert into root.sg1.d1(time,s1) values(13, 13)",
+ "insert into root.sg1.d1(time,s2) values(14, 14)",
+ "flush",
+ "insert into root.sg1.d2(time,s1) values(7, 7)",
+ "insert into root.sg1.d2(time,s2) values(8, 8)",
+ "flush",
+ "insert into root.sg1.d2(time,s1) values(3, 3)",
+ "insert into root.sg1.d2(time,s2) values(4, 4)",
+ "flush",
+ "insert into root.sg1.d2(time,s1) values(15, 15)",
+ "insert into root.sg1.d2(time,s2) values(16, 16)",
+ "flush"
+ };
+
+ @Before
+ public void setUp() throws ClassNotFoundException {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ prevTimeIndexMemoryProportion = CONFIG.getTimeIndexMemoryProportion();
+ prevTsFileManagementStrategy = CONFIG.getCompactionStrategy();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ prevTimeIndexMemoryThreshold =
+ prevTimeIndexMemoryProportion * CONFIG.getAllocateMemoryForRead();
+
tsFileResourceManager.setTimeIndexMemoryThreshold(prevTimeIndexMemoryThreshold);
+ CONFIG.setCompactionStrategy(prevTsFileManagementStrategy);
+ }
+
+ @Test
+ public void multiResourceTest() throws SQLException {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ CONFIG.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ double curTimeIndexMemoryThreshold = 1288.5;
+
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+ for (String sql : unSeqSQLs) {
+ statement.execute(sql);
+ }
+ statement.close();
+ List<TsFileResource> seqResources =
+ new ArrayList<>(
+ StorageEngine.getInstance()
+ .getProcessor(new PartialPath("root.sg1"))
+ .getSequenceFileTreeSet());
+ assertEquals(5, seqResources.size());
+ // five tsFileResource are degraded in total, 2 are in seqResources and
3 are in
+ // unSeqResources
+ for (int i = 0; i < seqResources.size(); i++) {
+ if (i < 2) {
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
+ TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+ } else {
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+ }
+ }
+ List<TsFileResource> unSeqResources =
+ new ArrayList<>(
+ StorageEngine.getInstance()
+ .getProcessor(new PartialPath("root.sg1"))
+ .getUnSequenceFileList());
+ assertEquals(3, unSeqResources.size());
+ for (TsFileResource resource : unSeqResources) {
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
TimeIndexLevel.valueOf(resource.getTimeIndexType()));
+ }
+ } catch (StorageEngineException | IllegalPathException e) {
+ Assert.fail();
+ }
+
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute("SELECT s1 FROM root.sg1.d1");
+ assertTrue(hasResultSet);
+ String[] exp = new String[] {"1,1.0", "5,5.0", "9,9.0", "13,13.0"};
+ int cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String result = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(2);
+ assertEquals(exp[cnt], result);
+ cnt++;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void oneResourceTest() throws SQLException {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ double curTimeIndexMemoryThreshold = 290;
+
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+ statement.execute("insert into root.sg1.wf01.wt01(timestamp, status)
values (1000, true)");
+ statement.execute("insert into root.sg1.wf01.wt01(timestamp, status)
values (2000, true)");
+ statement.execute("insert into root.sg1.wf01.wt01(timestamp, status)
values (3000, true)");
+ statement.execute("flush");
+ statement.close();
+ List<TsFileResource> resources =
+ new ArrayList<>(
+ StorageEngine.getInstance()
+ .getProcessor(new PartialPath("root.sg1"))
+ .getSequenceFileTreeSet());
+ assertEquals(1, resources.size());
+ for (TsFileResource resource : resources) {
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
TimeIndexLevel.valueOf(resource.getTimeIndexType()));
+ }
+ } catch (StorageEngineException | IllegalPathException e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void restartResourceTest()
+ throws SQLException, IllegalPathException, StorageEngineException {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ CONFIG.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ double curTimeIndexMemoryThreshold = 1288.5;
+
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+ for (int i = 0; i < unSeqSQLs.length - 1; i++) {
+ statement.execute(unSeqSQLs[i]);
+ }
+ statement.close();
+ List<TsFileResource> seqResources =
+ new ArrayList<>(
+ StorageEngine.getInstance()
+ .getProcessor(new PartialPath("root.sg1"))
+ .getSequenceFileTreeSet());
+ assertEquals(5, seqResources.size());
+
+ // Four tsFileResource are degraded in total, 1 are in seqResources and
3 are in
+ // unSeqResources. The difference with the multiResourceTest is that
last tsFileResource is
+ // not close, so degrade method can't be called.
+ for (int i = 0; i < seqResources.size(); i++) {
+ if (i < 4) {
+ assertTrue(seqResources.get(i).isClosed());
+ } else {
+ assertFalse(seqResources.get(i).isClosed());
+ }
+ if (i < 1) {
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
+ TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+ } else {
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+ }
+ }
+ List<TsFileResource> unSeqResources =
+ new ArrayList<>(
+ StorageEngine.getInstance()
+ .getProcessor(new PartialPath("root.sg1"))
+ .getUnSequenceFileList());
+ assertEquals(3, unSeqResources.size());
+ for (TsFileResource resource : unSeqResources) {
+ assertTrue(resource.isClosed());
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
TimeIndexLevel.valueOf(resource.getTimeIndexType()));
+ }
+ }
+
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ List<TsFileResource> seqResources =
+ new ArrayList<>(
+ StorageEngine.getInstance()
+ .getProcessor(new PartialPath("root.sg1"))
+ .getSequenceFileTreeSet());
+ assertEquals(5, seqResources.size());
+ for (int i = 0; i < seqResources.size(); i++) {
+ assertTrue(seqResources.get(i).isClosed());
+ }
+ List<TsFileResource> unSeqResources =
+ new ArrayList<>(
+ StorageEngine.getInstance()
+ .getProcessor(new PartialPath("root.sg1"))
+ .getUnSequenceFileList());
+ assertEquals(3, unSeqResources.size());
+ for (TsFileResource resource : unSeqResources) {
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
TimeIndexLevel.valueOf(resource.getTimeIndexType()));
+ assertTrue(resource.isClosed());
+ }
+
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute("SELECT s1 FROM root.sg1.d1");
+ assertTrue(hasResultSet);
+ String[] exp = new String[] {"1,1.0", "5,5.0", "9,9.0", "13,13.0"};
+ int cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String result = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(2);
+ assertEquals(exp[cnt], result);
+ cnt++;
+ }
+ }
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
new file mode 100644
index 0000000..538d398
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.rescon;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+
+public class ResourceManagerTest {
+
+ static final String RESOURCE_MANAGER_TEST_SG = "root.resourceManagerTest";
+ private int seqFileNum = 10;
+ private int measurementNum = 10;
+ int deviceNum = 10;
+ long ptNum = 100;
+ long flushInterval = 20;
+ TSEncoding encoding = TSEncoding.PLAIN;
+
+ String[] deviceIds;
+ MeasurementSchema[] measurementSchemas;
+
+ List<TsFileResource> seqResources = new ArrayList<>();
+ List<TsFileResource> unseqResources = new ArrayList<>();
+
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private TsFileResourceManager tsFileResourceManager =
TsFileResourceManager.getInstance();;
+ private double prevTimeIndexMemoryProportion;
+ private double prevTimeIndexMemoryThreshold;
+ private TimeIndexLevel timeIndexLevel;
+
+ @Before
+ public void setUp() throws IOException, WriteProcessException,
MetadataException {
+ IoTDB.metaManager.init();
+ prevTimeIndexMemoryProportion = CONFIG.getTimeIndexMemoryProportion();
+ timeIndexLevel = CONFIG.getTimeIndexLevel();
+ prepareSeries();
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ removeFiles();
+ seqResources.clear();
+ unseqResources.clear();
+ CONFIG.setTimeIndexMemoryProportion(prevTimeIndexMemoryProportion);
+ CONFIG.setTimeIndexLevel(String.valueOf(timeIndexLevel));
+ prevTimeIndexMemoryThreshold =
+ prevTimeIndexMemoryProportion * CONFIG.getAllocateMemoryForRead();
+
tsFileResourceManager.setTimeIndexMemoryThreshold(prevTimeIndexMemoryThreshold);
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+ IoTDB.metaManager.clear();
+ TsFileResourceManager.getInstance().clear();
+ EnvironmentUtils.cleanAllDir();
+ }
+
+ void prepareSeries() throws MetadataException {
+ measurementSchemas = new MeasurementSchema[measurementNum];
+ for (int i = 0; i < measurementNum; i++) {
+ measurementSchemas[i] =
+ new MeasurementSchema(
+ "sensor" + i, TSDataType.DOUBLE, encoding,
CompressionType.UNCOMPRESSED);
+ }
+ deviceIds = new String[deviceNum];
+ for (int i = 0; i < deviceNum; i++) {
+ deviceIds[i] = RESOURCE_MANAGER_TEST_SG + PATH_SEPARATOR + "device" + i;
+ }
+ IoTDB.metaManager.setStorageGroup(new
PartialPath(RESOURCE_MANAGER_TEST_SG));
+ for (String device : deviceIds) {
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ PartialPath devicePath = new PartialPath(device);
+ IoTDB.metaManager.createTimeseries(
+ devicePath.concatNode(measurementSchema.getMeasurementId()),
+ measurementSchema.getType(),
+ measurementSchema.getEncodingType(),
+ measurementSchema.getCompressor(),
+ Collections.emptyMap());
+ }
+ }
+ }
+
+ private void removeFiles() throws IOException {
+ for (TsFileResource tsFileResource : seqResources) {
+ if (tsFileResource.getTsFile().exists()) {
+ tsFileResource.remove();
+ }
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ if (tsFileResource.getTsFile().exists()) {
+ tsFileResource.remove();
+ }
+ }
+ File[] files =
FSFactoryProducer.getFSFactory().listFilesBySuffix("target", ".tsfile");
+ for (File file : files) {
+ file.delete();
+ }
+ File[] resourceFiles =
+ FSFactoryProducer.getFSFactory().listFilesBySuffix("target",
".resource");
+ for (File resourceFile : resourceFiles) {
+ resourceFile.delete();
+ }
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+ FileReaderManager.getInstance().stop();
+ }
+
+ void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
long valueOffset)
+ throws IOException, WriteProcessException {
+ TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile());
+ for (String deviceId : deviceIds) {
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ fileWriter.registerTimeseries(
+ new Path(deviceId, measurementSchema.getMeasurementId()),
measurementSchema);
+ }
+ }
+ for (long i = timeOffset; i < timeOffset + ptNum; i++) {
+ for (int j = 0; j < deviceNum; j++) {
+ TSRecord record = new TSRecord(i, deviceIds[j]);
+ for (int k = 0; k < measurementNum; k++) {
+ record.addTuple(
+ DataPoint.getDataPoint(
+ measurementSchemas[k].getType(),
+ measurementSchemas[k].getMeasurementId(),
+ String.valueOf(i + valueOffset)));
+ }
+ fileWriter.write(record);
+ tsFileResource.updateStartTime(deviceIds[j], i);
+ tsFileResource.updateEndTime(deviceIds[j], i);
+ }
+ if ((i + 1) % flushInterval == 0) {
+ fileWriter.flushAllChunkGroups();
+ }
+ }
+ fileWriter.close();
+ }
+
+ @Test
+ public void testDegradeMethod() throws IOException, WriteProcessException {
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.updatePlanIndexes((long) 0);
+ prepareFile(tsFileResource, 0, ptNum, 0);
+ long previousRamSize = tsFileResource.calculateRamSize();
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ long reducedMemory = tsFileResource.degradeTimeIndex();
+ assertEquals(previousRamSize - tsFileResource.calculateRamSize(),
reducedMemory);
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ }
+
+ @Test
+ public void testDegradeToFileTimeIndex() throws IOException,
WriteProcessException {
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.updatePlanIndexes((long) 0);
+ prepareFile(tsFileResource, 0, ptNum, 0);
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ double curTimeIndexMemoryThreshold = 322;
+
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ }
+
+ @Test
+ public void testNotDegradeToFileTimeIndex() throws IOException,
WriteProcessException {
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.updatePlanIndexes((long) 0);
+ prepareFile(tsFileResource, 0, ptNum, 0);
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ long previousRamSize = tsFileResource.calculateRamSize();
+ double curTimeIndexMemoryThreshold = 3221;
+
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+ assertEquals(0, previousRamSize - tsFileResource.calculateRamSize());
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ }
+
+ @Test
+ public void testTwoResourceToDegrade() throws IOException,
WriteProcessException {
+ File file1 =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource1 = new TsFileResource(file1);
+ tsFileResource1.setClosed(true);
+ tsFileResource1.updatePlanIndexes((long) 0);
+ prepareFile(tsFileResource1, 0, ptNum, 0);
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource1.getTimeIndexType()));
+ double curTimeIndexMemoryThreshold = 3221;
+
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource1);
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource1.getTimeIndexType()));
+ File file2 =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource2 = new TsFileResource(file2);
+ tsFileResource2.setClosed(true);
+ tsFileResource2.updatePlanIndexes((long) 1);
+ prepareFile(tsFileResource2, ptNum, ptNum, 0);
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource2.getTimeIndexType()));
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource2);
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
TimeIndexLevel.valueOf(tsFileResource1.getTimeIndexType()));
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource2.getTimeIndexType()));
+ }
+
+ @Test
+ public void testMultiDeviceTimeIndexDegrade() throws IOException,
WriteProcessException {
+ double curTimeIndexMemoryThreshold = 9663.7;
+
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+ for (int i = 0; i < seqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.updatePlanIndexes((long) i);
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ seqResources.add(tsFileResource);
+ prepareFile(tsFileResource, i * ptNum, ptNum, 0);
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+ }
+ assertEquals(10, tsFileResourceManager.getPriorityQueueSize());
+ for (int i = 0; i < seqFileNum; i++) {
+ if (i < 7) {
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
+ TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+ } else {
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(seqResources.get(i).getTimeIndexType()));
+ }
+ }
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testAllFileTimeIndexDegrade() throws IOException,
WriteProcessException {
+ long reducedMemory = 0;
+ CONFIG.setTimeIndexLevel(String.valueOf(TimeIndexLevel.FILE_TIME_INDEX));
+ double curTimeIndexMemoryThreshold = 322;
+
tsFileResourceManager.setTimeIndexMemoryThreshold(curTimeIndexMemoryThreshold);
+ try {
+ for (int i = 0; i < seqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.updatePlanIndexes((long) i);
+ seqResources.add(tsFileResource);
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ long previousRamSize = tsFileResource.calculateRamSize();
+ prepareFile(tsFileResource, i * ptNum, ptNum, 0);
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+ assertEquals(
+ TimeIndexLevel.FILE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ reducedMemory = previousRamSize - tsFileResource.calculateRamSize();
+ }
+ } catch (RuntimeException e) {
+ assertEquals(0, reducedMemory);
+ assertEquals(7, seqResources.size());
+ throw e;
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index f058a87..5caab07 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -43,6 +43,7 @@ import
org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TSocketWrapper;
@@ -161,6 +162,9 @@ public class EnvironmentUtils {
// clear memtable manager info
MemTableManager.getInstance().close();
+ // clear tsFileResource manager info
+ TsFileResourceManager.getInstance().clear();
+
// delete all directory
cleanAllDir();
config.setSeqTsFileSize(oldSeqTsFileSize);
@@ -302,6 +306,7 @@ public class EnvironmentUtils {
shutdownDaemon();
stopDaemon();
IoTDB.metaManager.clear();
+ TsFileResourceManager.getInstance().clear();
reactiveDaemon();
}