This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch release-0.12.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8d0fe3c50ec2ed072ebbbb397588bf29b777cb46
Author: Yann Byron <biyan900...@gmail.com>
AuthorDate: Wed Nov 30 18:11:23 2022 +0800

    [HUDI-5279] move logic for deleting active instant to HoodieActiveTimeline 
(#7196)
---
 .../apache/hudi/client/HoodieTimelineArchiver.java | 40 ++++++++++++----------
 .../table/timeline/HoodieActiveTimeline.java       | 14 +++++---
 2 files changed, 30 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index bb814f817d0..a61a5c90082 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -537,15 +537,14 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
   private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, 
HoodieEngineContext context) throws IOException {
     LOG.info("Deleting instants " + archivedInstants);
 
-    List<String> pendingInstantFiles = new ArrayList<>();
-    List<String> completedInstantFiles = new ArrayList<>();
+    List<HoodieInstant> pendingInstants = new ArrayList<>();
+    List<HoodieInstant> completedInstants = new ArrayList<>();
 
     for (HoodieInstant instant : archivedInstants) {
-      String filePath = new Path(metaClient.getMetaPath(), 
instant.getFileName()).toString();
       if (instant.isCompleted()) {
-        completedInstantFiles.add(filePath);
+        completedInstants.add(instant);
       } else {
-        pendingInstantFiles.add(filePath);
+        pendingInstants.add(instant);
       }
     }
 
@@ -556,27 +555,30 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
     // other monitors on the timeline(such as the compaction or clustering 
services) would
     // mistakenly recognize the pending file as a pending operation,
     // then all kinds of weird bugs occur.
-    boolean success = deleteArchivedInstantFiles(context, true, 
pendingInstantFiles);
-    success &= deleteArchivedInstantFiles(context, success, 
completedInstantFiles);
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    if (!pendingInstants.isEmpty()) {
+      context.foreach(
+          pendingInstants,
+          instant -> activeTimeline.deleteInstantFileIfExists(instant),
+          Math.min(pendingInstants.size(), 
config.getArchiveDeleteParallelism())
+      );
+    }
+    if (!completedInstants.isEmpty()) {
+      context.foreach(
+          completedInstants,
+          instant -> activeTimeline.deleteInstantFileIfExists(instant),
+          Math.min(completedInstants.size(), 
config.getArchiveDeleteParallelism())
+      );
+    }
 
     // Remove older meta-data from auxiliary path too
     Option<HoodieInstant> latestCommitted = 
Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() 
&& (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
         || 
(i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
     LOG.info("Latest Committed Instant=" + latestCommitted);
     if (latestCommitted.isPresent()) {
-      success &= 
deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get());
+      return 
deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get());
     }
-    return success;
-  }
-
-  private boolean deleteArchivedInstantFiles(HoodieEngineContext context, 
boolean success, List<String> files) {
-    Map<String, Boolean> resultDeleteInstantFiles = 
deleteFilesParallelize(metaClient, files, context, false);
-
-    for (Map.Entry<String, Boolean> result : 
resultDeleteInstantFiles.entrySet()) {
-      LOG.info("Archived and deleted instant file " + result.getKey() + " : " 
+ result.getValue());
-      success &= result.getValue();
-    }
-    return success;
+    return true;
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 96e22e9dac0..7dc3acc94e7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -265,22 +265,26 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     deleteInstantFile(instant);
   }
 
+  /**
+   * Note: This method should only be used in the case that delete 
requested/inflight instant or empty clean instant,
+   * and completed commit instant in an archive operation.
+   */
   public void deleteInstantFileIfExists(HoodieInstant instant) {
     LOG.info("Deleting instant " + instant);
-    Path inFlightCommitFilePath = 
getInstantFileNamePath(instant.getFileName());
+    Path commitFilePath = getInstantFileNamePath(instant.getFileName());
     try {
-      if (metaClient.getFs().exists(inFlightCommitFilePath)) {
-        boolean result = metaClient.getFs().delete(inFlightCommitFilePath, 
false);
+      if (metaClient.getFs().exists(commitFilePath)) {
+        boolean result = metaClient.getFs().delete(commitFilePath, false);
         if (result) {
           LOG.info("Removed instant " + instant);
         } else {
           throw new HoodieIOException("Could not delete instant " + instant);
         }
       } else {
-        LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not 
exist");
+        LOG.warn("The commit " + commitFilePath + " to remove does not exist");
       }
     } catch (IOException e) {
-      throw new HoodieIOException("Could not remove inflight commit " + 
inFlightCommitFilePath, e);
+      throw new HoodieIOException("Could not remove commit " + commitFilePath, 
e);
     }
   }
 

Reply via email to