This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e9a2c5f0a51 [HUDI-7284] Fix cluster stream sync check (#10501)
e9a2c5f0a51 is described below

commit e9a2c5f0a5131f82c9020e6f43a42f65136bc65f
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Fri Jan 19 15:26:08 2024 -0500

    [HUDI-7284] Fix cluster stream sync check (#10501)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../table/timeline/HoodieDefaultTimeline.java      | 17 +++---------
 .../apache/hudi/common/util/ClusteringUtils.java   | 30 ++++++++++++++++------
 .../hudi/common/util/TestClusteringUtils.java      | 15 +++++++++++
 3 files changed, 40 insertions(+), 22 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index ac0cb83cf15..1918a944fea 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.timeline;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -517,21 +518,9 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
 
   @Override
   public Option<HoodieInstant> getLastPendingClusterCommit() {
-    return  Option.fromJavaOptional(getCommitsTimeline().filter(s -> 
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
+    return  Option.fromJavaOptional(filterPendingReplaceTimeline()
         .getReverseOrderedInstants()
-        .filter(i -> {
-          try {
-            if (!i.isCompleted()) {
-              HoodieCommitMetadata metadata = 
TimelineUtils.getCommitMetadata(i, this);
-              return 
metadata.getOperationType().equals(WriteOperationType.CLUSTER);
-            } else {
-              return false;
-            }
-          } catch (IOException e) {
-            LOG.warn("Unable to read commit metadata for " + i + " due to " + 
e.getMessage());
-            return false;
-          }
-        }).findFirst());
+        .filter(i -> ClusteringUtils.isPendingClusteringInstant(this, 
i)).findFirst());
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index eea644a0bbc..b7ed15a07af 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -82,12 +82,12 @@ public class ClusteringUtils {
 
   /**
    * Get requested replace metadata from timeline.
-   * @param metaClient
-   * @param pendingReplaceInstant
-   * @return
+   * @param timeline used to get the bytes stored in the requested replace 
instant in the timeline
+   * @param pendingReplaceInstant can be in any state, because it will always 
be converted to requested state
+   * @return option of the replace metadata if present, else empty
    * @throws IOException
    */
-  private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant 
pendingReplaceInstant) throws IOException {
+  private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadata(HoodieTimeline timeline, HoodieInstant 
pendingReplaceInstant) throws IOException {
     final HoodieInstant requestedInstant;
     if (!pendingReplaceInstant.isRequested()) {
       // inflight replacecommit files don't have clustering plan.
@@ -97,7 +97,7 @@ public class ClusteringUtils {
     } else {
       requestedInstant = pendingReplaceInstant;
     }
-    Option<byte[]> content = 
metaClient.getActiveTimeline().getInstantDetails(requestedInstant);
+    Option<byte[]> content = timeline.getInstantDetails(requestedInstant);
     if (!content.isPresent() || content.get().length == 0) {
       // few operations create requested file without any content. Assume 
these are not clustering
       return Option.empty();
@@ -107,13 +107,23 @@ public class ClusteringUtils {
 
   /**
    * Get Clustering plan from timeline.
-   * @param metaClient
+   * @param metaClient used to get the active timeline
+   * @param pendingReplaceInstant can be in any state, because it will always 
be converted to requested state
+   * @return option of the replace metadata if present, else empty
+   */
+  public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> 
getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant 
pendingReplaceInstant) {
+    return getClusteringPlan(metaClient.getActiveTimeline(), 
pendingReplaceInstant);
+  }
+
+  /**
+   * Get Clustering plan from timeline.
+   * @param timeline
    * @param pendingReplaceInstant
    * @return
    */
-  public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> 
getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant 
pendingReplaceInstant) {
+  public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> 
getClusteringPlan(HoodieTimeline timeline, HoodieInstant pendingReplaceInstant) 
{
     try {
-      Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = 
getRequestedReplaceMetadata(metaClient, pendingReplaceInstant);
+      Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = 
getRequestedReplaceMetadata(timeline, pendingReplaceInstant);
       if (requestedReplaceMetadata.isPresent() && 
WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.get().getOperationType()))
 {
         return Option.of(Pair.of(pendingReplaceInstant, 
requestedReplaceMetadata.get().getClusteringPlan()));
       }
@@ -235,6 +245,10 @@ public class ClusteringUtils {
     return getClusteringPlan(metaClient, instant).isPresent();
   }
 
+  public static boolean isPendingClusteringInstant(HoodieTimeline timeline, 
HoodieInstant instant) {
+    return getClusteringPlan(timeline, instant).isPresent();
+  }
+
   /**
    * Returns the earliest instant to retain.
    * Make sure the clustering instant won't be archived before cleaned, and 
the earliest inflight clustering instant has a previous commit.
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index fb3adaf1c01..bd8dc707651 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -104,6 +104,21 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     validateClusteringInstant(fileIds1, partitionPath1, clusterTime1, 
fileGroupToInstantMap);
     validateClusteringInstant(fileIds2, partitionPath1, clusterTime, 
fileGroupToInstantMap);
     validateClusteringInstant(fileIds3, partitionPath1, clusterTime, 
fileGroupToInstantMap);
+    Option<HoodieInstant> lastPendingClustering = 
metaClient.getActiveTimeline().getLastPendingClusterCommit();
+    assertTrue(lastPendingClustering.isPresent());
+    assertEquals("2", lastPendingClustering.get().getTimestamp());
+
+    //check that it still gets picked if it is inflight
+    HoodieInstant inflight = 
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(lastPendingClustering.get(),
 Option.empty());
+    assertEquals(HoodieInstant.State.INFLIGHT, inflight.getState());
+    lastPendingClustering = 
metaClient.reloadActiveTimeline().getLastPendingClusterCommit();
+    assertEquals("2", lastPendingClustering.get().getTimestamp());
+
+    //now that it is complete, the first instant should be picked
+    HoodieInstant complete = 
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(false, 
inflight, Option.empty());
+    assertEquals(HoodieInstant.State.COMPLETED, complete.getState());
+    lastPendingClustering = 
metaClient.reloadActiveTimeline().getLastPendingClusterCommit();
+    assertEquals("1", lastPendingClustering.get().getTimestamp());
   }
 
   // replacecommit.inflight doesn't have clustering plan.

Reply via email to