This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch partitioned_file_version_management in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 67580273f8b9a881aed441f2151b5fc5c6e95013 Author: jt2594838 <[email protected]> AuthorDate: Mon Mar 23 17:34:29 2020 +0800 integrate data partition with file version management --- .../org/apache/iotdb/db/engine/StorageEngine.java | 6 +- .../engine/storagegroup/StorageGroupProcessor.java | 171 +++++++++++++-------- 2 files changed, 110 insertions(+), 67 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index c63f728..a838a54 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -549,10 +549,10 @@ public class StorageEngine implements IService { this.fileFlushPolicy = fileFlushPolicy; } - public boolean isFileAlreadyExist(TsFileResource tsFileResource, String storageGroup) { - // TODO-Cluster#350: integrate with time partitioning + public boolean isFileAlreadyExist(TsFileResource tsFileResource, String storageGroup, + long partitionNum) { StorageGroupProcessor processor = processorMap.get(storageGroup); - return processor != null && processor.isFileAlreadyExist(tsFileResource); + return processor != null && processor.isFileAlreadyExist(tsFileResource, partitionNum); } public static long getTimePartitionInterval() { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 295a074..c490723 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -18,6 +18,28 @@ */ package org.apache.iotdb.db.engine.storagegroup; +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; +import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -40,7 +62,11 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.version.SimpleFileVersionController; import org.apache.iotdb.db.engine.version.VersionController; -import org.apache.iotdb.db.exception.*; +import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.exception.MergeException; +import org.apache.iotdb.db.exception.StorageGroupProcessorException; +import org.apache.iotdb.db.exception.TsFileProcessorException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -53,6 +79,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryFileManager; import org.apache.iotdb.db.utils.CopyOnReadLinkedList; +import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.db.utils.UpgradeUtils; import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer; import org.apache.iotdb.rpc.RpcUtils; @@ -73,17 +100,6 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; -import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; - /** * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one @@ -203,9 +219,16 @@ public class StorageGroupProcessor { private FSFactory fsFactory = FSFactoryProducer.getFSFactory(); private TsFileFlushPolicy fileFlushPolicy; - // allDirectFileVersions records the versions of the direct TsFiles (generated by flush), not - // including the files generated by merge - private Set<Long> allDirectFileVersions = new HashSet<>(); + /** + * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, + * not including the files generated by merge) of each partition. + * As data file close is managed by the leader in the distributed version, the files with the + * same version(s) have the same data, despite that the inner structure (the size and + * organization of chunks) may be different, so we can easily find what remote files we do not + * have locally. + * partition number -> version number set + */ + private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>(); public StorageGroupProcessor(String systemInfoDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy) @@ -246,14 +269,18 @@ public class StorageGroupProcessor { if (resource.getFile().length() == 0) { deleteTsfile(resource.getFile()); } - allDirectFileVersions.addAll(resource.getHistoricalVersions()); + String[] filePathSplit = FilePathUtils.splitTsFilePath(resource); + long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]); + partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions()); } for (TsFileResource resource : unseqTsFiles) { //After recover, case the TsFile's length is equal to 0, delete both the TsFileResource and the file itself if (resource.getFile().length() == 0) { deleteTsfile(resource.getFile()); } - allDirectFileVersions.addAll(resource.getHistoricalVersions()); + String[] filePathSplit = FilePathUtils.splitTsFilePath(resource); + long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]); + partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions()); } String taskName = storageGroupName + "-" + System.currentTimeMillis(); @@ -307,17 +334,15 @@ public class StorageGroupProcessor { * @return version controller */ private VersionController getVersionControllerByTimePartitionId(long timePartitionId) { - VersionController res = timePartitionIdVersionControllerMap.get(timePartitionId); - if (res == null) { - try { - res = new SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId); - timePartitionIdVersionControllerMap.put(timePartitionId, res); - } catch (IOException e) { - logger.error("can't build a version controller for time partition" + timePartitionId); - } - } - - return res; + return timePartitionIdVersionControllerMap.computeIfAbsent(timePartitionId, + id -> { + try { + return new SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId); + } catch (IOException e) { + logger.error("can't build a version controller for time partition {}", timePartitionId); + return null; + } + }); } private List<TsFileResource> getAllFiles(List<String> folders) { @@ -328,17 +353,20 @@ public class StorageGroupProcessor { continue; } - for (File timeRangeFileFolder : fileFolder.listFiles()) { - // some TsFileResource may be being persisted when the system crashed, try recovering such - // resources - continueFailedRenames(timeRangeFileFolder, TEMP_SUFFIX); + File[] subFiles = fileFolder.listFiles(); + if (subFiles != null) { + for (File timeRangeFileFolder : subFiles) { + // some TsFileResource may be being persisted when the system crashed, try recovering such + // resources + continueFailedRenames(timeRangeFileFolder, TEMP_SUFFIX); - // some TsFiles were going to be replaced by the merged files when the system crashed and - // the process was interrupted before the merged files could be named - continueFailedRenames(timeRangeFileFolder, MERGE_SUFFIX); + // some TsFiles were going to be replaced by the merged files when the system crashed and + // the process was interrupted before the merged files could be named + continueFailedRenames(timeRangeFileFolder, MERGE_SUFFIX); - Collections.addAll(tsFiles, - fsFactory.listFilesBySuffix(timeRangeFileFolder.getAbsolutePath(), TSFILE_SUFFIX)); + Collections.addAll(tsFiles, + fsFactory.listFilesBySuffix(timeRangeFileFolder.getAbsolutePath(), TSFILE_SUFFIX)); + } } } @@ -796,12 +824,12 @@ public class StorageGroupProcessor { * @return file name */ private String getNewTsFileName(long timePartitionId) { - return getNewTsFileName(System.currentTimeMillis(), - getVersionControllerByTimePartitionId(timePartitionId).nextVersion(), 0); + long version = getVersionControllerByTimePartitionId(timePartitionId).nextVersion(); + partitionDirectFileVersions.computeIfAbsent(timePartitionId, p -> new HashSet<>()).add(version); + return getNewTsFileName(System.currentTimeMillis(), version, 0); } private String getNewTsFileName(long time, long version, int mergeCnt) { - allDirectFileVersions.add(version); return time + IoTDBConstant.TSFILE_NAME_SEPARATOR + version + IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX; } @@ -1185,22 +1213,7 @@ public class StorageGroupProcessor { // time partition to divide storage group long timePartitionId = StorageEngine.fromTimeToTimePartition(timestamp); // write log - if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { - DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId)); - for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) { - if (entry.getKey() <= timePartitionId) { - entry.getValue().getLogNode() - .write(deletionPlan); - } - } - - for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) { - if (entry.getKey() <= timePartitionId) { - entry.getValue().getLogNode() - .write(deletionPlan); - } - } - } + logDeletion(timestamp, deviceId, measurementId, timePartitionId); Path fullPath = new Path(deviceId, measurementId); Deletion deletion = new Deletion(fullPath, @@ -1225,6 +1238,26 @@ public class StorageGroupProcessor { } } + private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId) + throws IOException { + if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { + DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId)); + for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) { + if (entry.getKey() <= timePartitionId) { + entry.getValue().getLogNode() + .write(deletionPlan); + } + } + + for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) { + if (entry.getKey() <= timePartitionId) { + entry.getValue().getLogNode() + .write(deletionPlan); + } + } + } + } + private void deleteDataInFiles(Collection<TsFileResource> tsFileResourceList, Deletion deletion, List<ModificationFile> updatedModFiles) @@ -1559,6 +1592,7 @@ public class StorageGroupProcessor { public void loadNewTsFile(TsFileResource newTsFileResource) throws TsFileProcessorException { File tsfileToBeInserted = newTsFileResource.getFile(); + long newFilePartitionId = Long.parseLong(tsfileToBeInserted.getParent()); writeLock(); mergeLock.writeLock().lock(); try { @@ -1569,17 +1603,20 @@ public class StorageGroupProcessor { // check new tsfile outer: for (int i = 0; i < sequenceList.size(); i++) { - if (sequenceList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) { + TsFileResource localFile = sequenceList.get(i); + if (localFile.getFile().getName().equals(tsfileToBeInserted.getName())) { return; } - if (i == sequenceList.size() - 1 && sequenceList.get(i).getEndTimeMap().isEmpty()) { + long localPartitionId = Long.parseLong(localFile.getFile().getParent()); + if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty() + || newFilePartitionId != localPartitionId) { continue; } boolean hasPre = false, hasSubsequence = false; for (String device : newTsFileResource.getStartTimeMap().keySet()) { - if (sequenceList.get(i).getStartTimeMap().containsKey(device)) { - long startTime1 = sequenceList.get(i).getStartTimeMap().get(device); - long endTime1 = sequenceList.get(i).getEndTimeMap().get(device); + if (localFile.getStartTimeMap().containsKey(device)) { + long startTime1 = localFile.getStartTimeMap().get(device); + long endTime1 = localFile.getEndTimeMap().get(device); long startTime2 = newTsFileResource.getStartTimeMap().get(device); long endTime2 = newTsFileResource.getEndTimeMap().get(device); if (startTime1 > endTime2) { @@ -1625,7 +1662,10 @@ public class StorageGroupProcessor { // update latest time map updateLatestTimeMap(newTsFileResource); - allDirectFileVersions.addAll(newTsFileResource.getHistoricalVersions()); + String[] filePathSplit = FilePathUtils.splitTsFilePath(newTsFileResource); + long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]); + partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()) + .addAll(newTsFileResource.getHistoricalVersions()); } catch (DiskSpaceInsufficientException e) { logger.error( "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.", @@ -1642,8 +1682,10 @@ public class StorageGroupProcessor { * If the historical versions of a file is a sub-set of the given file's, remove it to reduce * unnecessary merge. Only used when the file sender and the receiver share the same file * close policy. + * Warning: DO NOT REMOVE * @param resource */ + @SuppressWarnings("unused") public void removeFullyOverlapFiles(TsFileResource resource) { writeLock(); closeQueryLock.writeLock().lock(); @@ -1985,8 +2027,9 @@ public class StorageGroupProcessor { return storageGroupName; } - public boolean isFileAlreadyExist(TsFileResource tsFileResource) { - return allDirectFileVersions.containsAll(tsFileResource.getHistoricalVersions()); + public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) { + return partitionDirectFileVersions.getOrDefault(partitionNum, Collections.emptySet()) + .containsAll(tsFileResource.getHistoricalVersions()); } @FunctionalInterface
