This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch partitioned_file_version_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 67580273f8b9a881aed441f2151b5fc5c6e95013
Author: jt2594838 <[email protected]>
AuthorDate: Mon Mar 23 17:34:29 2020 +0800

    integrate data partition with file version management
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |   6 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 171 +++++++++++++--------
 2 files changed, 110 insertions(+), 67 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index c63f728..a838a54 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -549,10 +549,10 @@ public class StorageEngine implements IService {
     this.fileFlushPolicy = fileFlushPolicy;
   }
 
-  public boolean isFileAlreadyExist(TsFileResource tsFileResource, String 
storageGroup) {
-    // TODO-Cluster#350: integrate with time partitioning
+  public boolean isFileAlreadyExist(TsFileResource tsFileResource, String 
storageGroup,
+      long partitionNum) {
     StorageGroupProcessor processor = processorMap.get(storageGroup);
-    return processor != null && processor.isFileAlreadyExist(tsFileResource);
+    return processor != null && processor.isFileAlreadyExist(tsFileResource, 
partitionNum);
   }
 
   public static long getTimePartitionInterval() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 295a074..c490723 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,28 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static 
org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -40,7 +62,11 @@ import 
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -53,6 +79,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
+import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -73,17 +100,6 @@ import 
org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static 
org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
 
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in 
which there is only one
@@ -203,9 +219,16 @@ public class StorageGroupProcessor {
   private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
   private TsFileFlushPolicy fileFlushPolicy;
 
-  // allDirectFileVersions records the versions of the direct TsFiles 
(generated by flush), not
-  // including the files generated by merge
-  private Set<Long> allDirectFileVersions = new HashSet<>();
+  /**
+   * partitionDirectFileVersions records the versions of the direct TsFiles 
(generated by close,
+   * not including the files generated by merge) of each partition.
+   * As data file close is managed by the leader in the distributed version, 
the files with the
+   * same version(s) have the same data, despite that the inner structure (the 
size and
+   * organization of chunks) may be different, so we can easily find what 
remote files we do not
+   * have locally.
+   * partition number -> version number set
+   */
+  private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
 
   public StorageGroupProcessor(String systemInfoDir, String storageGroupName,
       TsFileFlushPolicy fileFlushPolicy)
@@ -246,14 +269,18 @@ public class StorageGroupProcessor {
         if (resource.getFile().length() == 0) {
           deleteTsfile(resource.getFile());
         }
-        allDirectFileVersions.addAll(resource.getHistoricalVersions());
+        String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
+        long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length 
- 2]);
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>()).addAll(resource.getHistoricalVersions());
       }
       for (TsFileResource resource : unseqTsFiles) {
         //After recover, case the TsFile's length is equal to 0, delete both 
the TsFileResource and the file itself
         if (resource.getFile().length() == 0) {
           deleteTsfile(resource.getFile());
         }
-        allDirectFileVersions.addAll(resource.getHistoricalVersions());
+        String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
+        long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length 
- 2]);
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>()).addAll(resource.getHistoricalVersions());
       }
 
       String taskName = storageGroupName + "-" + System.currentTimeMillis();
@@ -307,17 +334,15 @@ public class StorageGroupProcessor {
    * @return version controller
    */
   private VersionController getVersionControllerByTimePartitionId(long 
timePartitionId) {
-    VersionController res = 
timePartitionIdVersionControllerMap.get(timePartitionId);
-    if (res == null) {
-      try {
-        res = new SimpleFileVersionController(storageGroupSysDir.getPath(), 
timePartitionId);
-        timePartitionIdVersionControllerMap.put(timePartitionId, res);
-      } catch (IOException e) {
-        logger.error("can't build a version controller for time partition" + 
timePartitionId);
-      }
-    }
-
-    return res;
+    return timePartitionIdVersionControllerMap.computeIfAbsent(timePartitionId,
+        id -> {
+          try {
+            return new 
SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId);
+          } catch (IOException e) {
+            logger.error("can't build a version controller for time partition 
{}", timePartitionId);
+            return null;
+          }
+        });
   }
 
   private List<TsFileResource> getAllFiles(List<String> folders) {
@@ -328,17 +353,20 @@ public class StorageGroupProcessor {
         continue;
       }
 
-      for (File timeRangeFileFolder : fileFolder.listFiles()) {
-        // some TsFileResource may be being persisted when the system crashed, 
try recovering such
-        // resources
-        continueFailedRenames(timeRangeFileFolder, TEMP_SUFFIX);
+      File[] subFiles = fileFolder.listFiles();
+      if (subFiles != null) {
+        for (File timeRangeFileFolder : subFiles) {
+          // some TsFileResource may be being persisted when the system 
crashed, try recovering such
+          // resources
+          continueFailedRenames(timeRangeFileFolder, TEMP_SUFFIX);
 
-        // some TsFiles were going to be replaced by the merged files when the 
system crashed and
-        // the process was interrupted before the merged files could be named
-        continueFailedRenames(timeRangeFileFolder, MERGE_SUFFIX);
+          // some TsFiles were going to be replaced by the merged files when 
the system crashed and
+          // the process was interrupted before the merged files could be named
+          continueFailedRenames(timeRangeFileFolder, MERGE_SUFFIX);
 
-        Collections.addAll(tsFiles,
-            fsFactory.listFilesBySuffix(timeRangeFileFolder.getAbsolutePath(), 
TSFILE_SUFFIX));
+          Collections.addAll(tsFiles,
+              
fsFactory.listFilesBySuffix(timeRangeFileFolder.getAbsolutePath(), 
TSFILE_SUFFIX));
+        }
       }
 
     }
@@ -796,12 +824,12 @@ public class StorageGroupProcessor {
    * @return file name
    */
   private String getNewTsFileName(long timePartitionId) {
-    return getNewTsFileName(System.currentTimeMillis(),
-        getVersionControllerByTimePartitionId(timePartitionId).nextVersion(), 
0);
+    long version = 
getVersionControllerByTimePartitionId(timePartitionId).nextVersion();
+    partitionDirectFileVersions.computeIfAbsent(timePartitionId, p -> new 
HashSet<>()).add(version);
+    return getNewTsFileName(System.currentTimeMillis(), version, 0);
   }
 
   private String getNewTsFileName(long time, long version, int mergeCnt) {
-    allDirectFileVersions.add(version);
     return time + IoTDBConstant.TSFILE_NAME_SEPARATOR + version
         + IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX;
   }
@@ -1185,22 +1213,7 @@ public class StorageGroupProcessor {
       // time partition to divide storage group
       long timePartitionId = StorageEngine.fromTimeToTimePartition(timestamp);
       // write log
-      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-        DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, 
measurementId));
-        for (Map.Entry<Long, TsFileProcessor> entry : 
workSequenceTsFileProcessors.entrySet()) {
-          if (entry.getKey() <= timePartitionId) {
-            entry.getValue().getLogNode()
-                .write(deletionPlan);
-          }
-        }
-
-        for (Map.Entry<Long, TsFileProcessor> entry : 
workUnsequenceTsFileProcessors.entrySet()) {
-          if (entry.getKey() <= timePartitionId) {
-            entry.getValue().getLogNode()
-                .write(deletionPlan);
-          }
-        }
-      }
+      logDeletion(timestamp, deviceId, measurementId, timePartitionId);
 
       Path fullPath = new Path(deviceId, measurementId);
       Deletion deletion = new Deletion(fullPath,
@@ -1225,6 +1238,26 @@ public class StorageGroupProcessor {
     }
   }
 
+  private void logDeletion(long timestamp, String deviceId, String 
measurementId, long timePartitionId)
+      throws IOException {
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+      DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, 
measurementId));
+      for (Map.Entry<Long, TsFileProcessor> entry : 
workSequenceTsFileProcessors.entrySet()) {
+        if (entry.getKey() <= timePartitionId) {
+          entry.getValue().getLogNode()
+              .write(deletionPlan);
+        }
+      }
+
+      for (Map.Entry<Long, TsFileProcessor> entry : 
workUnsequenceTsFileProcessors.entrySet()) {
+        if (entry.getKey() <= timePartitionId) {
+          entry.getValue().getLogNode()
+              .write(deletionPlan);
+        }
+      }
+    }
+  }
+
 
   private void deleteDataInFiles(Collection<TsFileResource> 
tsFileResourceList, Deletion deletion,
       List<ModificationFile> updatedModFiles)
@@ -1559,6 +1592,7 @@ public class StorageGroupProcessor {
   public void loadNewTsFile(TsFileResource newTsFileResource)
       throws TsFileProcessorException {
     File tsfileToBeInserted = newTsFileResource.getFile();
+    long newFilePartitionId = Long.parseLong(tsfileToBeInserted.getParent());
     writeLock();
     mergeLock.writeLock().lock();
     try {
@@ -1569,17 +1603,20 @@ public class StorageGroupProcessor {
       // check new tsfile
       outer:
       for (int i = 0; i < sequenceList.size(); i++) {
-        if 
(sequenceList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) {
+        TsFileResource localFile = sequenceList.get(i);
+        if 
(localFile.getFile().getName().equals(tsfileToBeInserted.getName())) {
           return;
         }
-        if (i == sequenceList.size() - 1 && 
sequenceList.get(i).getEndTimeMap().isEmpty()) {
+        long localPartitionId = 
Long.parseLong(localFile.getFile().getParent());
+        if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty()
+            || newFilePartitionId != localPartitionId) {
           continue;
         }
         boolean hasPre = false, hasSubsequence = false;
         for (String device : newTsFileResource.getStartTimeMap().keySet()) {
-          if (sequenceList.get(i).getStartTimeMap().containsKey(device)) {
-            long startTime1 = 
sequenceList.get(i).getStartTimeMap().get(device);
-            long endTime1 = sequenceList.get(i).getEndTimeMap().get(device);
+          if (localFile.getStartTimeMap().containsKey(device)) {
+            long startTime1 = localFile.getStartTimeMap().get(device);
+            long endTime1 = localFile.getEndTimeMap().get(device);
             long startTime2 = newTsFileResource.getStartTimeMap().get(device);
             long endTime2 = newTsFileResource.getEndTimeMap().get(device);
             if (startTime1 > endTime2) {
@@ -1625,7 +1662,10 @@ public class StorageGroupProcessor {
 
       // update latest time map
       updateLatestTimeMap(newTsFileResource);
-      allDirectFileVersions.addAll(newTsFileResource.getHistoricalVersions());
+      String[] filePathSplit = 
FilePathUtils.splitTsFilePath(newTsFileResource);
+      long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 
2]);
+      partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>())
+          .addAll(newTsFileResource.getHistoricalVersions());
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} 
because the disk space is insufficient.",
@@ -1642,8 +1682,10 @@ public class StorageGroupProcessor {
    * If the historical versions of a file is a sub-set of the given file's, 
remove it to reduce
    * unnecessary merge. Only used when the file sender and the receiver share 
the same file
    * close policy.
+   * Warning: DO NOT REMOVE
    * @param resource
    */
+  @SuppressWarnings("unused")
   public void removeFullyOverlapFiles(TsFileResource resource) {
     writeLock();
     closeQueryLock.writeLock().lock();
@@ -1985,8 +2027,9 @@ public class StorageGroupProcessor {
     return storageGroupName;
   }
 
-  public boolean isFileAlreadyExist(TsFileResource tsFileResource) {
-    return 
allDirectFileVersions.containsAll(tsFileResource.getHistoricalVersions());
+  public boolean isFileAlreadyExist(TsFileResource tsFileResource, long 
partitionNum) {
+    return partitionDirectFileVersions.getOrDefault(partitionNum, 
Collections.emptySet())
+        .containsAll(tsFileResource.getHistoricalVersions());
   }
 
   @FunctionalInterface

Reply via email to