This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch jira-707 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 84b57b081267dc21d213db2cc0d9f3053dba9887 Author: HTHou <[email protected]> AuthorDate: Tue May 26 18:07:15 2020 +0800 Optimize TsFileResource memory usage --- .../org/apache/iotdb/db/engine/StorageEngine.java | 6 +- .../merge/selector/MaxFileMergeFileSelector.java | 10 +- .../db/engine/merge/task/MergeMultiChunkTask.java | 6 +- .../engine/storagegroup/StorageGroupProcessor.java | 62 ++++--- .../db/engine/storagegroup/TsFileResource.java | 196 ++++++++++++++++----- .../iotdb/db/query/executor/LastQueryExecutor.java | 2 +- .../db/query/executor/fill/LastPointReader.java | 10 +- .../iotdb/db/query/reader/series/SeriesReader.java | 6 +- .../db/sync/receiver/load/FileLoaderManager.java | 2 +- .../apache/iotdb/db/tools/IoTDBDataDirViewer.java | 10 +- .../iotdb/db/tools/TsFileResourcePrinter.java | 10 +- .../iotdb/db/writelog/recover/LogReplayer.java | 8 +- .../engine/storagegroup/TsFileProcessorTest.java | 6 +- .../db/sync/receiver/load/FileLoaderTest.java | 8 +- .../recover/SyncReceiverLogAnalyzerTest.java | 4 +- .../iotdb/db/writelog/recover/LogReplayerTest.java | 9 +- .../recover/RecoverResourceFromReaderTest.java | 8 +- .../db/writelog/recover/SeqTsFileRecoverTest.java | 16 +- .../writelog/recover/UnseqTsFileRecoverTest.java | 8 +- 19 files changed, 246 insertions(+), 141 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 9dff8b3..ca6fc86 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -463,11 +463,11 @@ public class StorageEngine implements IService { public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException, StorageEngineException, MetadataException { - Map<String, Long> startTimeMap = newTsFileResource.getStartTimeMap(); - if (startTimeMap == null || startTimeMap.isEmpty()) { + Map<String, Integer> deviceMap = newTsFileResource.getDeviceToIndexMap(); + if (deviceMap == null || deviceMap.isEmpty()) { throw new StorageEngineException("Can not get the corresponding storage group."); } - String device = startTimeMap.keySet().iterator().next(); + String device = deviceMap.keySet().iterator().next(); String storageGroupName = MManager.getInstance().getStorageGroupName(device); getProcessor(storageGroupName).loadNewTsFile(newTsFileResource); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java index 1a26c89..e077993 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java @@ -213,18 +213,18 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector { private void selectOverlappedSeqFiles(TsFileResource unseqFile) { int tmpSelectedNum = 0; - for (Entry<String, Long> deviceStartTimeEntry : unseqFile.getStartTimeMap().entrySet()) { + for (Entry<String, Integer> deviceStartTimeEntry : unseqFile.getDeviceToIndexMap().entrySet()) { String deviceId = deviceStartTimeEntry.getKey(); - Long unseqStartTime = deviceStartTimeEntry.getValue(); - Long unseqEndTime = unseqFile.getEndTimeMap().get(deviceId); + long unseqStartTime = unseqFile.getStartTime(deviceId); + long unseqEndTime = unseqFile.getEndTime(deviceId); boolean noMoreOverlap = false; for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) { TsFileResource seqFile = resource.getSeqFiles().get(i); - if (seqSelected[i] || !seqFile.getEndTimeMap().containsKey(deviceId)) { + if (seqSelected[i] || seqFile.getEndTime(deviceId) < 0) { continue; } - long seqEndTime = seqFile.getEndTimeMap().get(deviceId); + long seqEndTime = seqFile.getEndTime(deviceId); if (unseqEndTime <= seqEndTime) { // the unseqFile overlaps current seqFile tmpSelectedSeqFiles.add(i); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java index f63a5cc..e3de98f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java @@ -152,9 +152,9 @@ class MergeMultiChunkTask { throws IOException { TsFileResource currTsFile = resource.getSeqFiles().get(seqFileIdx); String deviceId = currMergingPaths.get(0).getDevice(); - Long currDeviceMinTime = currTsFile.getStartTimeMap().get(deviceId); + long currDeviceMinTime = currTsFile.getStartTime(deviceId); //COMMENTS: is this correct? how about if there are other devices (in the currMergingPaths) that have unseq data? - if (currDeviceMinTime == null) { + if (currDeviceMinTime < 0) { return; } @@ -198,7 +198,7 @@ class MergeMultiChunkTask { mergeFileWriter.writeVersion(0L); mergeFileWriter.endChunkGroup(); mergeLogger.logFilePosition(mergeFileWriter.getFile()); - currTsFile.getStartTimeMap().put(deviceId, currDeviceMinTime); + currTsFile.putStartTime(deviceId, currDeviceMinTime); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 700f44c..5382296 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -318,12 +318,18 @@ public class StorageGroupProcessor { for (TsFileResource resource : sequenceFileTreeSet) { long timePartitionId = resource.getTimePartition(); + Map<String, Long> endTimeMap = new HashMap<>(); + for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) { + String deviceId = entry.getKey(); + long endTime = resource.getEndTime(deviceId); + endTimeMap.put(deviceId, endTime); + } latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()) - .putAll(resource.getEndTimeMap()); + .putAll(endTimeMap); partitionLatestFlushedTimeForEachDevice .computeIfAbsent(timePartitionId, id -> new HashMap<>()) - .putAll(resource.getEndTimeMap()); - globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap()); + .putAll(endTimeMap); + globalLatestFlushedTimeForEachDevice.putAll(endTimeMap); } } @@ -344,16 +350,16 @@ public class StorageGroupProcessor { VersionController versionController = new SimpleFileVersionController(storageGroupSysDir.getPath()); long currentVersion = versionController.currVersion(); for (TsFileResource resource : upgradeSeqFileList) { - for (Entry<String, Long> entry : resource.getEndTimeMap().entrySet()) { + for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) { String deviceId = entry.getKey(); - long endTime = entry.getValue(); + long endTime = resource.getEndTime(deviceId); long endTimePartitionId = StorageEngine.getTimePartition(endTime); latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new HashMap<>()) .put(deviceId, endTime); - globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap()); + globalLatestFlushedTimeForEachDevice.put(deviceId, endTime); // set all the covered partition's LatestFlushedTime to Long.MAX_VALUE - long partitionId = StorageEngine.getTimePartition(resource.startTimeMap.get(deviceId)); + long partitionId = StorageEngine.getTimePartition(resource.getStartTime(deviceId)); while (partitionId <= endTimePartitionId) { partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId, l -> new HashMap<>()) .put(deviceId, Long.MAX_VALUE); @@ -530,7 +536,7 @@ public class StorageGroupProcessor { workSequenceTsFileProcessors .put(timePartitionId, tsFileProcessor); tsFileResource.setProcessor(tsFileProcessor); - tsFileResource.endTimeMap.clear(); + tsFileResource.clearEndTimes(); tsFileResource.removeResourceFile(); tsFileProcessor.setTimeRangeId(timePartitionId); writer.makeMetadataVisible(); @@ -1235,8 +1241,9 @@ public class StorageGroupProcessor { .query(deviceId, measurementId, schema.getType(), schema.getEncodingType(), schema.getProps(), context); - tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(), - tsFileResource.getStartTimeMap(), tsFileResource.getEndTimeMap(), pair.left, + tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(), + tsFileResource.getDeviceToIndexMap(), + tsFileResource.getStartTimes(), tsFileResource.getEndTimes(), pair.left, pair.right)); } } catch (IOException e) { @@ -1269,13 +1276,13 @@ public class StorageGroupProcessor { return false; } if (dataTTL != Long.MAX_VALUE) { - Long deviceEndTime = tsFileResource.getEndTimeMap().get(deviceId); - return deviceEndTime == null || checkTTL(deviceEndTime); + long deviceEndTime = tsFileResource.getEndTime(deviceId); + return deviceEndTime < 0 || checkTTL(deviceEndTime); } if (timeFilter != null) { - long startTime = tsFileResource.getStartTimeMap().get(deviceId); - long endTime = tsFileResource.getEndTimeMap().getOrDefault(deviceId, Long.MAX_VALUE); + long startTime = tsFileResource.getStartTime(deviceId); + long endTime = tsFileResource.getOrDefaultEndTime(deviceId, Long.MAX_VALUE); return timeFilter.satisfyStartEndTime(startTime, endTime); } return true; @@ -1368,7 +1375,7 @@ public class StorageGroupProcessor { String deviceId = deletion.getDevice(); for (TsFileResource tsFileResource : tsFileResourceList) { if (!tsFileResource.containsDevice(deviceId) || - deletion.getTimestamp() < tsFileResource.getStartTimeMap().get(deviceId)) { + deletion.getTimestamp() < tsFileResource.getStartTime(deviceId)) { continue; } @@ -1398,7 +1405,7 @@ public class StorageGroupProcessor { */ private void updateEndTimeMap(TsFileProcessor tsFileProcessor) { TsFileResource resource = tsFileProcessor.getTsFileResource(); - for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) { + for (Entry<String, Integer> startTime : resource.getDeviceToIndexMap().entrySet()) { String deviceId = startTime.getKey(); resource.forceUpdateEndTime(deviceId, latestTimeForEachDevice.get(tsFileProcessor.getTimeRangeId()).get(deviceId)); @@ -1494,8 +1501,9 @@ public class StorageGroupProcessor { List<TsFileResource> upgradedResources = tsFileResource.getUpgradedResources(); for (TsFileResource resource : upgradedResources) { long partitionId = resource.getTimePartition(); - resource.getEndTimeMap().forEach((device, time) -> - updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device, time) + resource.getDeviceToIndexMap().forEach((device, index) -> + updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device, + resource.getEndTime(device)) ); } insertLock.writeLock().lock(); @@ -1818,7 +1826,7 @@ public class StorageGroupProcessor { return POS_ALREADY_EXIST; } long localPartitionId = Long.parseLong(localFile.getFile().getParentFile().getName()); - if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty() + if (i == sequenceList.size() - 1 && localFile.areEndTimesEmpty() || newFilePartitionId > localPartitionId) { // skip files that are in the previous partition and the last empty file, as the all data // in those files must be older than the new file @@ -1852,14 +1860,14 @@ public class StorageGroupProcessor { */ private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) { boolean hasPre = false, hasSubsequence = false; - for (String device : fileA.getStartTimeMap().keySet()) { - if (!fileB.getStartTimeMap().containsKey(device)) { + for (String device : fileA.getDeviceToIndexMap().keySet()) { + if (!fileB.getDeviceToIndexMap().containsKey(device)) { continue; } - long startTimeA = fileA.getStartTimeMap().get(device); - long endTimeA = fileA.getEndTimeMap().get(device); - long startTimeB = fileB.getStartTimeMap().get(device); - long endTimeB = fileB.getEndTimeMap().get(device); + long startTimeA = fileA.getStartTime(device); + long endTimeA = fileA.getEndTime(device); + long startTimeB = fileB.getStartTime(device); + long endTimeB = fileB.getEndTime(device); if (startTimeA > endTimeB) { // A's data of the device is later than to the B's data hasPre = true; @@ -1987,9 +1995,9 @@ public class StorageGroupProcessor { * @UsedBy sync module, load external tsfile module. */ private void updateLatestTimeMap(TsFileResource newTsFileResource) { - for (Entry<String, Long> entry : newTsFileResource.getEndTimeMap().entrySet()) { + for (Entry<String, Integer> entry : newTsFileResource.getDeviceToIndexMap().entrySet()) { String device = entry.getKey(); - long endTime = newTsFileResource.getEndTimeMap().get(device); + long endTime = newTsFileResource.getEndTime(device); long timePartitionId = StorageEngine.getTimePartition(endTime); if (!latestTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>()) .containsKey(device) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index d51f68e..29e8a1b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -63,16 +63,22 @@ public class TsFileResource { public static final String RESOURCE_SUFFIX = ".resource"; static final String TEMP_SUFFIX = ".temp"; private static final String CLOSING_SUFFIX = ".closing"; + private static final int INIT_ARRAY_SIZE = 64; /** - * device -> start time + * start times array. */ - protected Map<String, Long> startTimeMap; + private long[] startTimes; /** - * device -> end time. It is null if it's an unsealed sequence tsfile + * end times array. The values in this array are -1 if it's an unsealed sequence tsfile */ - protected Map<String, Long> endTimeMap; + private long[] endTimes; + + /** + * device -> index of start times array and end times array + */ + private Map<String, Integer> deviceToIndex; public TsFileProcessor getProcessor() { return processor; @@ -136,8 +142,9 @@ public class TsFileResource { public TsFileResource(TsFileResource other) throws IOException { this.file = other.file; - this.startTimeMap = other.startTimeMap; - this.endTimeMap = other.endTimeMap; + this.deviceToIndex = other.deviceToIndex; + this.startTimes = other.startTimes; + this.endTimes = other.endTimes; this.processor = other.processor; this.modFile = other.modFile; this.closed = other.closed; @@ -156,8 +163,11 @@ public class TsFileResource { */ public TsFileResource(File file) { this.file = file; - this.startTimeMap = new ConcurrentHashMap<>(); - this.endTimeMap = new HashMap<>(); + this.deviceToIndex = new ConcurrentHashMap<>(); + this.startTimes = new long[INIT_ARRAY_SIZE]; + this.endTimes = new long[INIT_ARRAY_SIZE]; + initTimes(startTimes); + initTimes(endTimes); } /** @@ -165,8 +175,11 @@ public class TsFileResource { */ public TsFileResource(File file, TsFileProcessor processor) { this.file = file; - this.startTimeMap = new ConcurrentHashMap<>(); - this.endTimeMap = new ConcurrentHashMap<>(); + this.deviceToIndex = new ConcurrentHashMap<>(); + this.startTimes = new long[INIT_ARRAY_SIZE]; + this.endTimes = new long[INIT_ARRAY_SIZE]; + initTimes(startTimes); + initTimes(endTimes); this.processor = processor; } @@ -174,13 +187,15 @@ public class TsFileResource { * unsealed TsFile */ public TsFileResource(File file, - Map<String, Long> startTimeMap, - Map<String, Long> endTimeMap, + Map<String, Integer> deviceToIndex, + long[] startTimes, + long[] endTimes, List<ReadOnlyMemChunk> readOnlyMemChunk, List<ChunkMetadata> chunkMetadataList) throws IOException { this.file = file; - this.startTimeMap = startTimeMap; - this.endTimeMap = endTimeMap; + this.deviceToIndex = deviceToIndex; + this.startTimes = startTimes; + this.endTimes = endTimes; this.chunkMetadataList = chunkMetadataList; this.readOnlyMemChunk = readOnlyMemChunk; generateTimeSeriesMetadata(); @@ -221,18 +236,24 @@ public class TsFileResource { } } + private void initTimes(long[] times) { + for (int i = 0; i < times.length; i++) { + times[i] = -1; + } + } + public void serialize() throws IOException { try (OutputStream outputStream = fsFactory.getBufferedOutputStream( file + RESOURCE_SUFFIX + TEMP_SUFFIX)) { - ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream); - for (Entry<String, Long> entry : this.startTimeMap.entrySet()) { + ReadWriteIOUtils.write(this.deviceToIndex.size(), outputStream); + for (Entry<String, Integer> entry : this.deviceToIndex.entrySet()) { ReadWriteIOUtils.write(entry.getKey(), outputStream); - ReadWriteIOUtils.write(entry.getValue(), outputStream); + ReadWriteIOUtils.write(startTimes[entry.getValue()], outputStream); } - ReadWriteIOUtils.write(this.endTimeMap.size(), outputStream); - for (Entry<String, Long> entry : this.endTimeMap.entrySet()) { + ReadWriteIOUtils.write(this.deviceToIndex.size(), outputStream); + for (Entry<String, Integer> entry : this.deviceToIndex.entrySet()) { ReadWriteIOUtils.write(entry.getKey(), outputStream); - ReadWriteIOUtils.write(entry.getValue(), outputStream); + ReadWriteIOUtils.write(endTimes[entry.getValue()], outputStream); } if (historicalVersions != null) { @@ -252,21 +273,24 @@ public class TsFileResource { try (InputStream inputStream = fsFactory.getBufferedInputStream( file + RESOURCE_SUFFIX)) { int size = ReadWriteIOUtils.readInt(inputStream); - Map<String, Long> startTimes = new HashMap<>(); + Map<String, Integer> deviceToIndex = new HashMap<>(); + long[] startTimes = new long[size]; + long[] endTimes = new long[size]; for (int i = 0; i < size; i++) { String path = ReadWriteIOUtils.readString(inputStream); long time = ReadWriteIOUtils.readLong(inputStream); - startTimes.put(path, time); + deviceToIndex.put(path, i); + startTimes[i] = time; } size = ReadWriteIOUtils.readInt(inputStream); - Map<String, Long> endTimes = new HashMap<>(); for (int i = 0; i < size; i++) { - String path = ReadWriteIOUtils.readString(inputStream); + ReadWriteIOUtils.readString(inputStream); // String path long time = ReadWriteIOUtils.readLong(inputStream); - endTimes.put(path, time); + endTimes[i] = time; } - this.startTimeMap = startTimes; - this.endTimeMap = endTimes; + this.startTimes = startTimes; + this.endTimes = endTimes; + this.deviceToIndex = deviceToIndex; if (inputStream.available() > 0) { int versionSize = ReadWriteIOUtils.readInt(inputStream); @@ -283,16 +307,16 @@ public class TsFileResource { } public void updateStartTime(String device, long time) { - long startTime = startTimeMap.getOrDefault(device, Long.MAX_VALUE); + long startTime = getOrDefaultStartTime(device, Long.MAX_VALUE); if (time < startTime) { - startTimeMap.put(device, time); + putStartTime(device, time); } } public void updateEndTime(String device, long time) { - long endTime = endTimeMap.getOrDefault(device, Long.MIN_VALUE); + long endTime = getOrDefaultEndTime(device, Long.MIN_VALUE); if (time > endTime) { - endTimeMap.put(device, time); + putEndTime(device, time); } } @@ -301,7 +325,7 @@ public class TsFileResource { } void forceUpdateEndTime(String device, long time) { - endTimeMap.put(device, time); + putEndTime(device, time); } public List<ChunkMetadata> getChunkMetadataList() { @@ -324,7 +348,7 @@ public class TsFileResource { } boolean containsDevice(String deviceId) { - return startTimeMap.containsKey(deviceId); + return deviceToIndex.containsKey(deviceId); } public File getFile() { @@ -339,12 +363,90 @@ public class TsFileResource { return file.length(); } - public Map<String, Long> getStartTimeMap() { - return startTimeMap; + public long getStartTime(String deviceId) { + if (!deviceToIndex.containsKey(deviceId)) { + return -1; + } + return startTimes[deviceToIndex.get(deviceId)]; } - public Map<String, Long> getEndTimeMap() { - return endTimeMap; + public long getEndTime(String deviceId) { + if (!deviceToIndex.containsKey(deviceId)) { + return -1; + } + return endTimes[deviceToIndex.get(deviceId)]; + } + + public long getOrDefaultStartTime(String deviceId, long defaultTime) { + return getStartTime(deviceId) >= 0 ? startTimes[deviceToIndex.get(deviceId)] : defaultTime; + } + + public long getOrDefaultEndTime(String deviceId, long defaultTime) { + return getEndTime(deviceId) >= 0 ? endTimes[deviceToIndex.get(deviceId)] : defaultTime; + } + + public void putStartTime(String deviceId, long startTime) { + int index; + if (containsDevice(deviceId)) { + index = deviceToIndex.get(deviceId); + } + else { + index = deviceToIndex.size(); + deviceToIndex.put(deviceId, index); + if (startTimes.length <= index) { + enLargeArray(startTimes); + enLargeArray(endTimes); + } + } + startTimes[index] = startTime; + } + + public void putEndTime(String deviceId, long endTime) { + int index; + if (containsDevice(deviceId)) { + index = deviceToIndex.get(deviceId); + } + else { + index = deviceToIndex.size(); + deviceToIndex.put(deviceId, index); + if (endTimes.length <= index) { + enLargeArray(startTimes); + enLargeArray(endTimes); + } + } + endTimes[index] = endTime; + } + + private void enLargeArray(long[] array) { + long[] tmp = new long[array.length * 2]; + System.arraycopy(array, 0, tmp, 0, array.length); + array = tmp; + } + + public Map<String, Integer> getDeviceToIndexMap() { + return deviceToIndex; + } + + public long[] getStartTimes() { + return startTimes; + } + + public long[] getEndTimes() { + return endTimes; + } + + public void clearEndTimes() { + endTimes = new long[endTimes.length]; + initTimes(endTimes); + } + + public boolean areEndTimesEmpty() { + for (long endTime : endTimes) { + if (endTime != -1) { + return false; + } + } + return true; } public boolean isClosed() { @@ -446,7 +548,7 @@ public class TsFileResource { if (timeLowerBound == Long.MAX_VALUE) { return true; } - for (long endTime : endTimeMap.values()) { + for (long endTime : endTimes) { // the file cannot be deleted if any device still lives if (endTime >= timeLowerBound) { return true; @@ -455,12 +557,12 @@ public class TsFileResource { return false; } - protected void setStartTimeMap(Map<String, Long> startTimeMap) { - this.startTimeMap = startTimeMap; + protected void setStartTimes(long[] startTimes) { + this.startTimes = startTimes; } - protected void setEndTimeMap(Map<String, Long> endTimeMap) { - this.endTimeMap = endTimeMap; + protected void setEndTimes(long[] endTimes) { + this.endTimes= endTimes; } /** @@ -531,12 +633,12 @@ public class TsFileResource { } /** - * make sure Either the startTimeMap is not empty + * make sure Either the deviceToIndex is not empty * Or the path contains a partition folder */ public long getTimePartition() { - if (startTimeMap != null && !startTimeMap.isEmpty()) { - return StorageEngine.getTimePartition(startTimeMap.values().iterator().next()); + if (deviceToIndex != null && !deviceToIndex.isEmpty()) { + return StorageEngine.getTimePartition(startTimes[deviceToIndex.values().iterator().next()]); } String[] splits = FilePathUtils.splitTsFilePath(this); return Long.parseLong(splits[splits.length - 2]); @@ -550,7 +652,7 @@ public class TsFileResource { */ public long getTimePartitionWithCheck() throws PartitionViolationException { long partitionId = -1; - for (Long startTime : startTimeMap.values()) { + for (Long startTime : startTimes) { long p = StorageEngine.getTimePartition(startTime); if (partitionId == -1) { partitionId = p; @@ -560,7 +662,7 @@ public class TsFileResource { } } } - for (Long endTime : endTimeMap.values()) { + for (Long endTime : endTimes) { long p = StorageEngine.getTimePartition(endTime); if (partitionId == -1) { partitionId = p; diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java index 28c6870..33ab8a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java @@ -159,7 +159,7 @@ public class LastQueryExecutor { long version = 0; for (TsFileResource resource : unseqFileResources) { - if (resource.getEndTimeMap().get(seriesPath.getDevice()) < resultPair.getTimestamp()) { + if (resource.getEndTime(seriesPath.getDevice()) < resultPair.getTimestamp()) { continue; } TimeseriesMetadata timeseriesMetadata = diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java index ac80a16..4d48257 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.query.executor.fill; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.PriorityQueue; import java.util.Set; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; @@ -125,7 +124,7 @@ public class LastPointReader { sortUnSeqFileResourcesInDecendingOrder(dataSource.getUnseqResources()); while (!unseqFileResource.isEmpty() - && (lBoundTime <= unseqFileResource.peek().getEndTimeMap().get(seriesPath.getDevice()))) { + && (lBoundTime <= unseqFileResource.peek().getEndTime(seriesPath.getDevice()))) { TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata( unseqFileResource.poll(), seriesPath, context, timeFilter, allSensors); @@ -183,11 +182,8 @@ public class LastPointReader { PriorityQueue<TsFileResource> unseqTsFilesSet = new PriorityQueue<>( (o1, o2) -> { - Map<String, Long> startTimeMap = o1.getEndTimeMap(); - Long minTimeOfO1 = startTimeMap.get(seriesPath.getDevice()); - Map<String, Long> startTimeMap2 = o2.getEndTimeMap(); - Long minTimeOfO2 = startTimeMap2.get(seriesPath.getDevice()); - + Long minTimeOfO1 = o1.getEndTime(seriesPath.getDevice()); + Long minTimeOfO2 = o2.getEndTime(seriesPath.getDevice()); return Long.compare(minTimeOfO2, minTimeOfO1); }); unseqTsFilesSet.addAll(tsFileResources); diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index 0e04e23..816be29 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java @@ -573,7 +573,7 @@ class SeriesReader { return tsFileResources.stream() .sorted( Comparator.comparingLong( - tsFileResource -> tsFileResource.getStartTimeMap().get(seriesPath.getDevice()))) + tsFileResource -> tsFileResource.getStartTime(seriesPath.getDevice()))) .collect(Collectors.toCollection(LinkedList::new)); } @@ -660,7 +660,7 @@ class SeriesReader { private void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endTime) throws IOException { while (!unseqFileResource.isEmpty() - && endTime >= unseqFileResource.get(0).getStartTimeMap().get(seriesPath.getDevice())) { + && endTime >= unseqFileResource.get(0).getStartTime(seriesPath.getDevice())) { TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata( unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors); @@ -670,7 +670,7 @@ class SeriesReader { } } while (!seqFileResource.isEmpty() - && endTime >= seqFileResource.get(0).getStartTimeMap().get(seriesPath.getDevice())) { + && endTime >= seqFileResource.get(0).getStartTime(seriesPath.getDevice())) { TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata( seqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors); diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java index 289d477..4eb911d 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java @@ -95,7 +95,7 @@ public class FileLoaderManager { throws SyncDeviceOwnerConflictException, IOException { String curOwner = tsFileResource.getFile().getParentFile().getParentFile().getParentFile() .getName(); - Set<String> deviceSet = tsFileResource.getStartTimeMap().keySet(); + Set<String> deviceSet = tsFileResource.getDeviceToIndexMap().keySet(); checkDeviceConflict(curOwner, deviceSet); updateDeviceOwner(curOwner, deviceSet); } diff --git a/server/src/main/java/org/apache/iotdb/db/tools/IoTDBDataDirViewer.java b/server/src/main/java/org/apache/iotdb/db/tools/IoTDBDataDirViewer.java index 68c2857..85ad43b 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/IoTDBDataDirViewer.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/IoTDBDataDirViewer.java @@ -136,14 +136,14 @@ public class IoTDBDataDirViewer { TsFileResource resource = new TsFileResource(SystemFileFactory.INSTANCE.getFile(filename)); resource.deserialize(); // sort device strings - SortedSet<String> keys = new TreeSet<>(resource.getStartTimeMap().keySet()); + SortedSet<String> keys = new TreeSet<>(resource.getDeviceToIndexMap().keySet()); for (String device : keys) { printlnBoth(pw, String.format("| | | | |--device %s, start time %d (%s), end time %d (%s)", device, - resource.getStartTimeMap().get(device), DatetimeUtils - .convertMillsecondToZonedDateTime(resource.getStartTimeMap().get(device)), - resource.getEndTimeMap().get(device), DatetimeUtils - .convertMillsecondToZonedDateTime(resource.getEndTimeMap().get(device)))); + resource.getStartTime(device), DatetimeUtils + .convertMillsecondToZonedDateTime(resource.getStartTime(device)), + resource.getEndTime(device), DatetimeUtils + .convertMillsecondToZonedDateTime(resource.getEndTime(device)))); } } diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java index 03e97d1..ffd2bc4 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java @@ -58,16 +58,16 @@ public class TsFileResourcePrinter { System.out.println("historicalVersions: " + resource.getHistoricalVersions()); - for (String device : resource.getStartTimeMap().keySet()) { + for (String device : resource.getDeviceToIndexMap().keySet()) { System.out.println(String.format( "device %s, " + "start time %d (%s), " + "end time %d (%s)", device, - resource.getStartTimeMap().get(device), - DatetimeUtils.convertMillsecondToZonedDateTime(resource.getStartTimeMap().get(device)), - resource.getEndTimeMap().get(device), - DatetimeUtils.convertMillsecondToZonedDateTime(resource.getEndTimeMap().get(device)))); + resource.getStartTime(device), + DatetimeUtils.convertMillsecondToZonedDateTime(resource.getStartTime(device)), + resource.getEndTime(device), + DatetimeUtils.convertMillsecondToZonedDateTime(resource.getEndTime(device)))); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java index 0f32e8e..1f479b7 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java @@ -127,8 +127,8 @@ public class LogReplayer { throws WriteProcessException, QueryProcessException { if (currentTsFileResource != null) { // the last chunk group may contain the same data with the logs, ignore such logs in seq file - Long lastEndTime = currentTsFileResource.getEndTimeMap().get(insertTabletPlan.getDeviceId()); - if (lastEndTime != null && lastEndTime >= insertTabletPlan.getMinTime() && + long lastEndTime = currentTsFileResource.getEndTime(insertTabletPlan.getDeviceId()); + if (lastEndTime >= 0 && lastEndTime >= insertTabletPlan.getMinTime() && !acceptDuplication) { return; } @@ -155,8 +155,8 @@ public class LogReplayer { private void replayInsert(InsertPlan insertPlan) { if (currentTsFileResource != null) { // the last chunk group may contain the same data with the logs, ignore such logs in seq file - Long lastEndTime = currentTsFileResource.getEndTimeMap().get(insertPlan.getDeviceId()); - if (lastEndTime != null && lastEndTime >= insertPlan.getTime() && + long lastEndTime = currentTsFileResource.getEndTime(insertPlan.getDeviceId()); + if (lastEndTime >= 0 && lastEndTime >= insertPlan.getTime() && !acceptDuplication) { return; } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java index 4398041..66068f6 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java @@ -282,9 +282,9 @@ public class TsFileProcessorTest { private void closeTsFileProcessor(TsFileProcessor unsealedTsFileProcessor) throws TsFileProcessorException { TsFileResource resource = unsealedTsFileProcessor.getTsFileResource(); synchronized (resource) { - for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) { - String deviceId = startTime.getKey(); - resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId)); + for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) { + String deviceId = entry.getKey(); + resource.putEndTime(deviceId, resource.getStartTime(deviceId)); } try { resource.close(); diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java index 4324193..a30e18c 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java @@ -114,8 +114,8 @@ public class FileLoaderTest { LOGGER.error("Can not create new file {}", syncFile.getPath()); } TsFileResource tsFileResource = new TsFileResource(syncFile); - tsFileResource.getStartTimeMap().put(String.valueOf(i), (long) j * 10); - tsFileResource.getEndTimeMap().put(String.valueOf(i), (long) j * 10 + 5); + tsFileResource.putStartTime(String.valueOf(i), (long) j * 10); + tsFileResource.putEndTime(String.valueOf(i), (long) j * 10 + 5); tsFileResource.serialize(); } } @@ -209,8 +209,8 @@ public class FileLoaderTest { LOGGER.error("Can not create new file {}", syncFile.getPath()); } TsFileResource tsFileResource = new TsFileResource(syncFile); - tsFileResource.getStartTimeMap().put(String.valueOf(i), (long) j * 10); - tsFileResource.getEndTimeMap().put(String.valueOf(i), (long) j * 10 + 5); + tsFileResource.putStartTime(String.valueOf(i), (long) j * 10); + tsFileResource.putEndTime(String.valueOf(i), (long) j * 10 + 5); tsFileResource.serialize(); } } diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java index 3ce1019..1ea2f3a 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java @@ -125,8 +125,8 @@ public class SyncReceiverLogAnalyzerTest { LOGGER.error("Can not create new file {}", syncFile.getPath()); } TsFileResource tsFileResource = new TsFileResource(syncFile); - tsFileResource.getStartTimeMap().put(String.valueOf(i), (long) j * 10); - tsFileResource.getEndTimeMap().put(String.valueOf(i), (long) j * 10 + 5); + tsFileResource.putStartTime(String.valueOf(i), (long) j * 10); + tsFileResource.putEndTime(String.valueOf(i), (long) j * 10 + 5); tsFileResource.serialize(); } } diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java index fc1005c..fcf59c4 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java @@ -51,7 +51,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -135,11 +134,11 @@ public class LogReplayerTest { assertEquals(1, mods.length); assertEquals(new Deletion(new Path("root.sg.device0", "sensor0"), 5, 200), mods[0]); - assertEquals(2, (long) tsFileResource.getStartTimeMap().get("root.sg.device0")); - assertEquals(100, (long) tsFileResource.getEndTimeMap().get("root.sg.device0")); + assertEquals(2, (long) tsFileResource.getStartTime("root.sg.device0")); + assertEquals(100, (long) tsFileResource.getEndTime("root.sg.device0")); for (int i = 1; i < 5; i++) { - assertEquals(i, (long) tsFileResource.getStartTimeMap().get("root.sg.device" + i)); - assertEquals(i, (long) tsFileResource.getEndTimeMap().get("root.sg.device" + i)); + assertEquals(i, (long) tsFileResource.getStartTime("root.sg.device" + i)); + assertEquals(i, (long) tsFileResource.getEndTime("root.sg.device" + i)); } } finally { modFile.close(); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java index a881e01..26f66f5 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java @@ -179,11 +179,11 @@ public class RecoverResourceFromReaderTest { versionController, resource, true, false); ActiveTimeSeriesCounter.getInstance().init(resource.getFile().getParentFile().getParentFile().getName()); performer.recover(); - assertEquals(1, (long) resource.getStartTimeMap().get("root.sg.device99")); - assertEquals(300, (long) resource.getEndTimeMap().get("root.sg.device99")); + assertEquals(1, (long) resource.getStartTime("root.sg.device99")); + assertEquals(300, (long) resource.getEndTime("root.sg.device99")); for (int i = 0; i < 10; i++) { - assertEquals(0, (long) resource.getStartTimeMap().get("root.sg.device" + i)); - assertEquals(9, (long) resource.getEndTimeMap().get("root.sg.device" + i)); + assertEquals(0, (long) resource.getStartTime("root.sg.device" + i)); + assertEquals(9, (long) resource.getEndTime("root.sg.device" + i)); } } } diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java index 5c51d9a..1bcf36d 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java @@ -172,11 +172,11 @@ public class SeqTsFileRecoverTest { RestorableTsFileIOWriter writer = performer.recover(); assertFalse(writer.canWrite()); - assertEquals(2, (long) resource.getStartTimeMap().get("root.sg.device99")); - assertEquals(100, (long) resource.getEndTimeMap().get("root.sg.device99")); + assertEquals(2, (long) resource.getStartTime("root.sg.device99")); + assertEquals(100, (long) resource.getEndTime("root.sg.device99")); for (int i = 0; i < 10; i++) { - assertEquals(0, (long) resource.getStartTimeMap().get("root.sg.device" + i)); - assertEquals(19, (long) resource.getEndTimeMap().get("root.sg.device" + i)); + assertEquals(0, (long) resource.getStartTime("root.sg.device" + i)); + assertEquals(19, (long) resource.getEndTime("root.sg.device" + i)); } ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath())); @@ -226,11 +226,11 @@ public class SeqTsFileRecoverTest { assertTrue(writer.canWrite()); writer.endFile(); - assertEquals(2, (long) resource.getStartTimeMap().get("root.sg.device99")); - assertEquals(100, (long) resource.getEndTimeMap().get("root.sg.device99")); + assertEquals(2, (long) resource.getStartTime("root.sg.device99")); + assertEquals(100, (long) resource.getEndTime("root.sg.device99")); for (int i = 0; i < 10; i++) { - assertEquals(0, (long) resource.getStartTimeMap().get("root.sg.device" + i)); - assertEquals(19, (long) resource.getEndTimeMap().get("root.sg.device" + i)); + assertEquals(0, (long) resource.getStartTime("root.sg.device" + i)); + assertEquals(19, (long) resource.getEndTime("root.sg.device" + i)); } ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath())); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java index b285dd0..d9b25c6 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java @@ -179,11 +179,11 @@ public class UnseqTsFileRecoverTest { ActiveTimeSeriesCounter.getInstance().init(resource.getFile().getParentFile().getParentFile().getName()); performer.recover(); - assertEquals(1, (long) resource.getStartTimeMap().get("root.sg.device99")); - assertEquals(300, (long) resource.getEndTimeMap().get("root.sg.device99")); + assertEquals(1, (long) resource.getStartTime("root.sg.device99")); + assertEquals(300, (long) resource.getEndTime("root.sg.device99")); for (int i = 0; i < 10; i++) { - assertEquals(0, (long) resource.getStartTimeMap().get("root.sg.device" + i)); - assertEquals(9, (long) resource.getEndTimeMap().get("root.sg.device" + i)); + assertEquals(0, (long) resource.getStartTime("root.sg.device" + i)); + assertEquals(9, (long) resource.getEndTime("root.sg.device" + i)); } TsFileSequenceReader fileReader = new TsFileSequenceReader(tsF.getPath(), true);
