codope commented on a change in pull request #3695:
URL: https://github.com/apache/hudi/pull/3695#discussion_r713733466



##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
##########
@@ -421,16 +593,329 @@ public String getBaseFileNameById(String fileId) {
   }
 
   public FileStatus[] listAllFilesInPartition(String partitionPath) throws 
IOException {
-    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, 
partitionPath).toString())).toArray(new FileStatus[0]);
+    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, 
partitionPath).toString())).stream()
+        .filter(entry -> {
+          boolean toReturn = true;
+          String fileName = entry.getPath().getName();
+          if 
(fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+            toReturn = false;
+          } else {
+            for (String inflight : inflightCommits) {
+              if (fileName.contains(inflight)) {
+                toReturn = false;
+                break;
+              }
+            }
+          }
+          return toReturn;
+        }).toArray(FileStatus[]::new);
   }
 
   public FileStatus[] listAllFilesInTempFolder() throws IOException {
     return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME).toString())).toArray(new FileStatus[0]);
   }
 
+  public void deleteFilesInPartition(String partitionPath, List<String> 
filesToDelete) throws IOException {
+    FileStatus[] allFiles = listAllFilesInPartition(partitionPath);
+    Arrays.stream(allFiles).filter(entry -> 
filesToDelete.contains(entry.getPath().getName())).forEach(entry -> {
+      try {
+        Files.delete(Paths.get(basePath, partitionPath, 
entry.getPath().getName()));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    });
+  }
+
+  public HoodieTestTable doRollback(String commitTimeToRollback, String 
commitTime) throws Exception {
+    Option<HoodieCommitMetadata> commitMetadata = 
getMetadataForInstant(commitTimeToRollback);
+    if (!commitMetadata.isPresent()) {
+      throw new IllegalArgumentException("Instant to rollback not present in 
timeline: " + commitTimeToRollback);
+    }
+    Map<String, List<String>> partitionFiles = 
getPartitionFiles(commitMetadata.get());
+    HoodieRollbackMetadata rollbackMetadata = 
getRollbackMetadata(commitTimeToRollback, partitionFiles);
+    for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
+      deleteFilesInPartition(entry.getKey(), entry.getValue());
+    }
+    return addRollback(commitTime, rollbackMetadata);
+  }
+
+  public HoodieTestTable doCluster(String commitTime, Map<String, 
List<String>> partitionToReplaceFileIds) throws Exception {
+    Map<String, List<Pair<String, Integer>>> 
partitionToReplaceFileIdsWithLength = new HashMap<>();
+    for (Map.Entry<String, List<String>> entry : 
partitionToReplaceFileIds.entrySet()) {
+      String partition = entry.getKey();
+      partitionToReplaceFileIdsWithLength.put(entry.getKey(), new 
ArrayList<>());
+      for (String fileId : entry.getValue()) {
+        int length = 100 + RANDOM.nextInt(500);
+        partitionToReplaceFileIdsWithLength.get(partition).add(Pair.of(fileId, 
length));
+      }
+    }
+    List<HoodieWriteStat> writeStats = 
generateHoodieWriteStatForPartition(partitionToReplaceFileIdsWithLength, 
commitTime, false);
+    HoodieReplaceCommitMetadata replaceMetadata =
+        (HoodieReplaceCommitMetadata) buildMetadata(writeStats, 
partitionToReplaceFileIds, Option.empty(), CLUSTER, EMPTY_STRING, 
REPLACE_COMMIT_ACTION);
+    return addReplaceCommit(commitTime, Option.empty(), Option.empty(), 
replaceMetadata);
+  }
+
+  public HoodieCleanMetadata doClean(String commitTime, Map<String, Integer> 
partitionFileCountsToDelete) throws IOException {
+    Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+    for (Map.Entry<String, Integer> entry : 
partitionFileCountsToDelete.entrySet()) {
+      partitionFilesToDelete.put(entry.getKey(), 
getEarliestFilesInPartition(entry.getKey(), entry.getValue()));
+    }
+    HoodieTestTableState testTableState = new HoodieTestTableState();
+    for (Map.Entry<String, List<String>> entry : 
partitionFilesToDelete.entrySet()) {
+      testTableState = 
testTableState.createTestTableStateForCleaner(commitTime, entry.getKey(), 
entry.getValue());
+      deleteFilesInPartition(entry.getKey(), entry.getValue());
+    }
+    Pair<HoodieCleanerPlan, HoodieCleanMetadata> cleanerMeta = 
getHoodieCleanMetadata(commitTime, testTableState);
+    addClean(commitTime, cleanerMeta.getKey(), cleanerMeta.getValue());
+    return cleanerMeta.getValue();
+  }
+
+  public HoodieCleanMetadata doClean(String cleanCommitTime, List<String> 
commitsToClean) throws IOException {
+    Map<String, Integer> partitionFileCountsToDelete = new HashMap<>();
+    for (String commitTime : commitsToClean) {
+      Option<HoodieCommitMetadata> commitMetadata = 
getMetadataForInstant(commitTime);
+      if (commitMetadata.isPresent()) {
+        Map<String, List<String>> partitionFiles = 
getPartitionFiles(commitMetadata.get());
+        for (String partition : partitionFiles.keySet()) {
+          partitionFileCountsToDelete.put(partition, 
partitionFiles.get(partition).size() + 
partitionFileCountsToDelete.getOrDefault(partition, 0));
+        }
+      }
+    }
+    return doClean(cleanCommitTime, partitionFileCountsToDelete);
+  }
+
+  public HoodieSavepointMetadata doSavepoint(String commitTime) throws 
IOException {
+    Option<HoodieCommitMetadata> commitMetadata = 
getMetadataForInstant(commitTime);
+    if (!commitMetadata.isPresent()) {
+      throw new IllegalArgumentException("Instant to rollback not present in 
timeline: " + commitTime);
+    }
+    Map<String, List<String>> partitionFiles = 
getPartitionFiles(commitMetadata.get());
+    HoodieSavepointMetadata savepointMetadata = 
getSavepointMetadata(commitTime, partitionFiles);
+    for (Map.Entry<String, List<String>> entry : partitionFiles.entrySet()) {
+      deleteFilesInPartition(entry.getKey(), entry.getValue());
+    }
+    return savepointMetadata;
+  }
+
+  public HoodieTestTable doCompaction(String commitTime, List<String> 
partitions) throws Exception {
+    this.currentInstantTime = commitTime;
+    if (partitions.isEmpty()) {
+      partitions = Collections.singletonList(EMPTY_STRING);
+    }
+    HoodieTestTableState testTableState = 
getTestTableStateWithPartitionFileInfo(metaClient.getTableType(), commitTime, 
partitions, 1);
+    HoodieCommitMetadata commitMetadata = createCommitMetadata(COMPACT, 
commitTime, testTableState);
+    for (String partition : partitions) {
+      this.withBaseFilesInPartition(partition, 
testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition));
+      if (MERGE_ON_READ.equals(metaClient.getTableType())) {
+        this.withLogFilesInPartition(partition, 
testTableState.getPartitionToLogFileInfoMap(commitTime).get(partition));

Review comment:
       Got it. Removed this part.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to