This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 38e5a35e76e9def6acfa1db5fa2858ecedccc867 Author: Sivabalan Narayanan <n.siv...@gmail.com> AuthorDate: Mon Dec 12 07:28:22 2022 -0800 [HUDI-5078] Fixing isTableService for replace commits (#7037) --- .../apache/hudi/client/BaseHoodieWriteClient.java | 2 +- .../java/org/apache/hudi/table/HoodieTable.java | 19 ++++++++++++++++-- .../hudi/table/action/BaseActionExecutor.java | 2 +- .../apache/hudi/client/HoodieFlinkWriteClient.java | 2 +- .../hudi/table/HoodieFlinkCopyOnWriteTable.java | 6 ------ .../hudi/table/HoodieFlinkMergeOnReadTable.java | 6 ------ .../hudi/table/HoodieJavaCopyOnWriteTable.java | 6 ------ .../hudi/table/HoodieJavaMergeOnReadTable.java | 6 ------ .../apache/hudi/client/SparkRDDWriteClient.java | 2 +- .../hudi/table/HoodieSparkCopyOnWriteTable.java | 5 ----- .../hudi/table/HoodieSparkMergeOnReadTable.java | 5 ----- .../apache/hudi/io/TestHoodieTimelineArchiver.java | 23 ++++++++++++++++++++++ .../hudi/common/table/timeline/HoodieInstant.java | 2 +- .../table/timeline/TestHoodieActiveTimeline.java | 2 +- 14 files changed, 46 insertions(+), 42 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 609f85e27fe..011235e4369 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -348,7 +348,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName()); table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, - table.isTableServiceAction(actionType))); + table.isTableServiceAction(actionType, instantTime))); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 77b24a38953..7a9211a7c57 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.fs.OptimisticConsistencyGuard; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -58,6 +59,7 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -823,10 +825,23 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem /** * Check if action type is a table service. - * @param actionType action type of interest. + * @param actionType action type of the instant + * @param instantTime instant time of the instant. * @return true if action represents a table service. false otherwise. */ - public abstract boolean isTableServiceAction(String actionType); + public boolean isTableServiceAction(String actionType, String instantTime) { + if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(HoodieInstant.State.NIL, actionType, instantTime)); + // only clustering is table service with replace commit action + return instantPlan.isPresent(); + } else { + if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } else { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + } + } /** * Get Table metadata writer. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index f893b4ccd5c..d2b2cef2f60 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -58,7 +58,7 @@ public abstract class BaseActionExecutor<T extends HoodieRecordPayload, I, K, O, */ protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { table.getMetadataWriter(instantTime).ifPresent(w -> w.update( - metadata, instantTime, table.isTableServiceAction(actionType))); + metadata, instantTime, table.isTableServiceAction(actionType, instantTime))); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 33148cddb11..b4003712c56 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -289,7 +289,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends // the schema expects to be immutable for SQL jobs but may be not for non-SQL // jobs. this.metadataWriter.initTableMetadata(); - this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); + this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime)); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 2c8a3c4e49c..543751a0410 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; @@ -93,11 +92,6 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index aa8adde7353..9b7d3447177 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -58,11 +57,6 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload> super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata<List<WriteStatus>> upsert( HoodieEngineContext context, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 88921334980..342c018e5a2 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -90,11 +89,6 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - @Override public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java index 32d30f704ec..5af29502a95 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -43,11 +42,6 @@ public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends H super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index ea7a660c379..fa568a7a6fe 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -425,7 +425,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata, HoodieInstant hoodieInstant) { - boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); + boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. table.getMetadataWriter(hoodieInstant.getTimestamp()) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index a88ca65c35a..743aff51a12 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -103,11 +103,6 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - @Override public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) { return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index efc667af297..c9d74246319 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -79,11 +79,6 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) { return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 76b51e6970b..72aba1f1638 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -281,6 +281,29 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { } } + @Test + public void testArchiveTableWithReplaceCommits() throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 4, 2); + for (int i = 1; i < 7; i++) { + if (i < 3) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), + Arrays.asList("p1", "p2"), 2); + } else { + testTable.doWriteOperation("0000000" + i, WriteOperationType.INSERT_OVERWRITE, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } + // trigger archival + Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig); + List<HoodieInstant> originalCommits = commitsList.getKey(); + List<HoodieInstant> commitsAfterArchival = commitsList.getValue(); + + if (i == 6) { + // after all rounds, only 3 should be left in active timeline. 4,5,6 + assertEquals(originalCommits, commitsAfterArchival); + assertEquals(3, originalCommits.size()); + } + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index 8b1cb875c09..bd29e2d6a2f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -68,7 +68,7 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> { // Committed instant COMPLETED, // Invalid instant - INVALID + NIL } private State state = State.COMPLETED; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index e40341a2033..b418c89f949 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -663,7 +663,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { List<HoodieInstant> allInstants = new ArrayList<>(); long instantTime = 1; for (State state : State.values()) { - if (state == State.INVALID) { + if (state == State.NIL) { continue; } for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) {