This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new e9a2c5f0a51 [HUDI-7284] Fix cluster stream sync check (#10501) e9a2c5f0a51 is described below commit e9a2c5f0a5131f82c9020e6f43a42f65136bc65f Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Fri Jan 19 15:26:08 2024 -0500 [HUDI-7284] Fix cluster stream sync check (#10501) Co-authored-by: Jonathan Vexler <=> --- .../table/timeline/HoodieDefaultTimeline.java | 17 +++--------- .../apache/hudi/common/util/ClusteringUtils.java | 30 ++++++++++++++++------ .../hudi/common/util/TestClusteringUtils.java | 15 +++++++++++ 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index ac0cb83cf15..1918a944fea 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -21,6 +21,7 @@ package org.apache.hudi.common.table.timeline; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant.State; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -517,21 +518,9 @@ public class HoodieDefaultTimeline implements HoodieTimeline { @Override public Option<HoodieInstant> getLastPendingClusterCommit() { - return Option.fromJavaOptional(getCommitsTimeline().filter(s -> s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION)) + return Option.fromJavaOptional(filterPendingReplaceTimeline() .getReverseOrderedInstants() - .filter(i -> { - try { - if (!i.isCompleted()) { - HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(i, this); - return metadata.getOperationType().equals(WriteOperationType.CLUSTER); - } else { - return false; - } - } catch (IOException e) { - LOG.warn("Unable to read commit metadata for " + i + " due to " + e.getMessage()); - return false; - } - }).findFirst()); + .filter(i -> ClusteringUtils.isPendingClusteringInstant(this, i)).findFirst()); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index eea644a0bbc..b7ed15a07af 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -82,12 +82,12 @@ public class ClusteringUtils { /** * Get requested replace metadata from timeline. - * @param metaClient - * @param pendingReplaceInstant - * @return + * @param timeline used to get the bytes stored in the requested replace instant in the timeline + * @param pendingReplaceInstant can be in any state, because it will always be converted to requested state + * @return option of the replace metadata if present, else empty * @throws IOException */ - private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) throws IOException { + private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTimeline timeline, HoodieInstant pendingReplaceInstant) throws IOException { final HoodieInstant requestedInstant; if (!pendingReplaceInstant.isRequested()) { // inflight replacecommit files don't have clustering plan. @@ -97,7 +97,7 @@ public class ClusteringUtils { } else { requestedInstant = pendingReplaceInstant; } - Option<byte[]> content = metaClient.getActiveTimeline().getInstantDetails(requestedInstant); + Option<byte[]> content = timeline.getInstantDetails(requestedInstant); if (!content.isPresent() || content.get().length == 0) { // few operations create requested file without any content. Assume these are not clustering return Option.empty(); @@ -107,13 +107,23 @@ public class ClusteringUtils { /** * Get Clustering plan from timeline. - * @param metaClient + * @param metaClient used to get the active timeline + * @param pendingReplaceInstant can be in any state, because it will always be converted to requested state + * @return option of the replace metadata if present, else empty + */ + public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) { + return getClusteringPlan(metaClient.getActiveTimeline(), pendingReplaceInstant); + } + + /** + * Get Clustering plan from timeline. + * @param timeline * @param pendingReplaceInstant * @return */ - public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) { + public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTimeline timeline, HoodieInstant pendingReplaceInstant) { try { - Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = getRequestedReplaceMetadata(metaClient, pendingReplaceInstant); + Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = getRequestedReplaceMetadata(timeline, pendingReplaceInstant); if (requestedReplaceMetadata.isPresent() && WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.get().getOperationType())) { return Option.of(Pair.of(pendingReplaceInstant, requestedReplaceMetadata.get().getClusteringPlan())); } @@ -235,6 +245,10 @@ public class ClusteringUtils { return getClusteringPlan(metaClient, instant).isPresent(); } + public static boolean isPendingClusteringInstant(HoodieTimeline timeline, HoodieInstant instant) { + return getClusteringPlan(timeline, instant).isPresent(); + } + /** * Returns the earliest instant to retain. * Make sure the clustering instant won't be archived before cleaned, and the earliest inflight clustering instant has a previous commit. diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java index fb3adaf1c01..bd8dc707651 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java @@ -104,6 +104,21 @@ public class TestClusteringUtils extends HoodieCommonTestHarness { validateClusteringInstant(fileIds1, partitionPath1, clusterTime1, fileGroupToInstantMap); validateClusteringInstant(fileIds2, partitionPath1, clusterTime, fileGroupToInstantMap); validateClusteringInstant(fileIds3, partitionPath1, clusterTime, fileGroupToInstantMap); + Option<HoodieInstant> lastPendingClustering = metaClient.getActiveTimeline().getLastPendingClusterCommit(); + assertTrue(lastPendingClustering.isPresent()); + assertEquals("2", lastPendingClustering.get().getTimestamp()); + + //check that it still gets picked if it is inflight + HoodieInstant inflight = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(lastPendingClustering.get(), Option.empty()); + assertEquals(HoodieInstant.State.INFLIGHT, inflight.getState()); + lastPendingClustering = metaClient.reloadActiveTimeline().getLastPendingClusterCommit(); + assertEquals("2", lastPendingClustering.get().getTimestamp()); + + //now that it is complete, the first instant should be picked + HoodieInstant complete = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(false, inflight, Option.empty()); + assertEquals(HoodieInstant.State.COMPLETED, complete.getState()); + lastPendingClustering = metaClient.reloadActiveTimeline().getLastPendingClusterCommit(); + assertEquals("1", lastPendingClustering.get().getTimestamp()); } // replacecommit.inflight doesn't have clustering plan.