This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eb39b2a6968 [MINOR] Avoid scanning raw timeline on archival if no 
instants to archive (#12482)
eb39b2a6968 is described below

commit eb39b2a696864e1e152c1d6ef1878b17ea9133fc
Author: Tim Brown <[email protected]>
AuthorDate: Fri Dec 13 09:12:42 2024 -0600

    [MINOR] Avoid scanning raw timeline on archival if no instants to archive 
(#12482)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |  4 --
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  3 ++
 .../timeline/versioning/v1/TimelineArchiverV1.java | 54 +++++++++++++---------
 .../timeline/versioning/v2/TimelineArchiverV2.java |  4 ++
 4 files changed, 38 insertions(+), 27 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index e96a6707247..81256ddcbb9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -826,10 +826,6 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
     try {
       final Timer.Context timerContext = metrics.getArchiveCtx();
       // We cannot have unbounded commit files. Archive commits if we have to 
archive.
-
-      // Reload table timeline to reflect the latest commits,
-      // there are some table services (for e.g, the cleaning) that executed 
right before the archiving.
-      table.getMetaClient().reloadActiveTimeline();
       HoodieTimelineArchiver archiver = 
TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(), 
config, table);
       int instantsToArchive = archiver.archiveIfRequired(context, true);
       if (timerContext != null) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index aeefb09a114..e451ff50893 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -613,6 +613,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       LOG.info("Async archiver has finished");
     } else {
       LOG.info("Start to archive synchronously.");
+      // Reload table timeline to reflect the latest commits,
+      // there are some table services (for e.g, the cleaning) that executed 
right before the archiving.
+      table.getMetaClient().reloadActiveTimeline();
       archive(table);
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index c539811d149..8108d3b3ed9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -67,6 +67,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -139,14 +140,13 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
         // there is no owner or instant time per se for archival.
         txnManager.beginTransaction(Option.empty(), Option.empty());
       }
-      List<HoodieInstant> instantsToArchive = 
getInstantsToArchive().collect(Collectors.toList());
-      boolean success = true;
+      List<HoodieInstant> instantsToArchive = getInstantsToArchive();
       if (!instantsToArchive.isEmpty()) {
         this.writer = openWriter(archiveFilePath.getParent());
-        LOG.info("Archiving instants " + instantsToArchive);
+        LOG.info("Archiving instants {}", instantsToArchive);
         archive(context, instantsToArchive);
-        LOG.info("Deleting archived instants " + instantsToArchive);
-        success = deleteArchivedInstants(instantsToArchive, context);
+        LOG.info("Deleting archived instants {}", instantsToArchive);
+        deleteArchivedInstants(instantsToArchive, context);
       } else {
         LOG.info("No Instants to archive");
       }
@@ -280,17 +280,18 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
     }
   }
 
-  private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
-    Stream<HoodieInstant> instants = 
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+  private List<HoodieInstant> getInstantsToArchive() throws IOException {
     if (config.isMetaserverEnabled()) {
-      return Stream.empty();
+      return Collections.emptyList();
     }
 
-    // For archiving and cleaning instants, we need to include intermediate 
state files if they exist
-    HoodieActiveTimeline rawActiveTimeline = new ActiveTimelineV1(metaClient, 
false);
-    Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = 
rawActiveTimeline.getInstantsAsStream()
-        .collect(Collectors.groupingBy(i -> Pair.of(i.requestedTime(),
-            InstantComparatorV1.getComparableAction(i.getAction()))));
+    List<HoodieInstant> candidates = 
Stream.concat(getCleanInstantsToArchive(), 
getCommitInstantsToArchive()).collect(Collectors.toList());
+    if (candidates.isEmpty()) {
+      // exit early to avoid loading meta client for metadata table
+      return Collections.emptyList();
+    }
+
+    Stream<HoodieInstant> instants = candidates.stream();
 
     // If metadata table is enabled, do not archive instants which are more 
recent than the last compaction on the
     // metadata table.
@@ -339,16 +340,23 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
       }
     }
 
-    return instants.flatMap(hoodieInstant -> {
-      List<HoodieInstant> instantsToStream = 
groupByTsAction.get(Pair.of(hoodieInstant.requestedTime(),
-          InstantComparatorV1.getComparableAction(hoodieInstant.getAction())));
-      if (instantsToStream != null) {
-        return instantsToStream.stream();
-      } else {
-        // if a concurrent writer archived the instant
-        return Stream.empty();
-      }
-    });
+    List<HoodieInstant> instantsToArchive = 
instants.collect(Collectors.toList());
+    if (instantsToArchive.isEmpty()) {
+      // Exit early to avoid loading raw timeline
+      return Collections.emptyList();
+    }
+
+    // For archiving and cleaning instants, we need to include intermediate 
state files if they exist
+    HoodieActiveTimeline rawActiveTimeline = new ActiveTimelineV1(metaClient, 
false);
+    Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = 
rawActiveTimeline.getInstantsAsStream()
+        .collect(Collectors.groupingBy(i -> Pair.of(i.requestedTime(),
+            InstantComparatorV1.getComparableAction(i.getAction()))));
+
+    return instantsToArchive.stream()
+        .flatMap(hoodieInstant ->
+            groupByTsAction.getOrDefault(Pair.of(hoodieInstant.requestedTime(),
+                
InstantComparatorV1.getComparableAction(hoodieInstant.getAction())), 
Collections.emptyList()).stream())
+        .collect(Collectors.toList());
   }
 
   private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, 
HoodieEngineContext context) throws IOException {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
index fd376868111..40f3abbc2bc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
@@ -298,6 +298,10 @@ public class TimelineArchiverV2<T extends 
HoodieAvroPayload, I, K, O> implements
       
instantsToArchive.sort(InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
     }
 
+    if (instantsToArchive.isEmpty()) {
+      return Stream.empty();
+    }
+
     // For archive, we need to include instant's all states.
     // The re-instantiation of the timeline may result in inconsistencies with 
the existing meta client active timeline,
     // When there is no lock guard of the archiving process, the 'raw' 
timeline could contain less distinct instants

Reply via email to