This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6f117f22c23 [HUDI-7230] Flink stream read supports skipping insert overwrite instant (#10328) 6f117f22c23 is described below commit 6f117f22c233e7768b63d7e08bd223f8d9cf80d7 Author: zhuanshenbsj1 <34104400+zhuanshenb...@users.noreply.github.com> AuthorDate: Mon Apr 8 17:22:22 2024 +0800 [HUDI-7230] Flink stream read supports skipping insert overwrite instant (#10328) --- .../table/read/IncrementalQueryAnalyzer.java | 20 ++- .../apache/hudi/common/util/ClusteringUtils.java | 15 ++ .../apache/hudi/configuration/FlinkOptions.java | 8 ++ .../apache/hudi/source/IncrementalInputSplits.java | 17 ++- .../hudi/source/StreamReadMonitoringFunction.java | 1 + .../hudi/source/TestIncrementalInputSplits.java | 151 +++++++++++++++++++-- 6 files changed, 194 insertions(+), 18 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java index 3f0eb32c7e5..ca8ae575898 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java @@ -111,6 +111,7 @@ public class IncrementalQueryAnalyzer { private final InstantRange.RangeType rangeType; private final boolean skipCompaction; private final boolean skipClustering; + private final boolean skipInsertOverwrite; private final int limit; private IncrementalQueryAnalyzer( @@ -120,6 +121,7 @@ public class IncrementalQueryAnalyzer { InstantRange.RangeType rangeType, boolean skipCompaction, boolean skipClustering, + boolean skipInsertOverwrite, int limit) { this.metaClient = metaClient; this.startTime = Option.ofNullable(startTime); @@ -127,6 +129,7 @@ public class IncrementalQueryAnalyzer { this.rangeType = rangeType; this.skipCompaction = skipCompaction; this.skipClustering = skipClustering; + this.skipInsertOverwrite = skipInsertOverwrite; this.limit = limit; } @@ -206,13 +209,13 @@ public class IncrementalQueryAnalyzer { private HoodieTimeline getFilteredTimeline(HoodieTableMetaClient metaClient) { HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(); - return filterInstantsAsPerUserConfigs(metaClient, timeline, this.skipCompaction, this.skipClustering); + return filterInstantsAsPerUserConfigs(metaClient, timeline, this.skipCompaction, this.skipClustering, this.skipInsertOverwrite); } private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient metaClient, String startInstant) { HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(startInstant, false); HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants(); - return filterInstantsAsPerUserConfigs(metaClient, archivedCompleteTimeline, this.skipCompaction, this.skipClustering); + return filterInstantsAsPerUserConfigs(metaClient, archivedCompleteTimeline, this.skipCompaction, this.skipClustering, this.skipInsertOverwrite); } /** @@ -223,7 +226,7 @@ public class IncrementalQueryAnalyzer { * @return the filtered timeline */ @VisibleForTesting - public static HoodieTimeline filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline timeline, boolean skipCompaction, boolean skipClustering) { + public static HoodieTimeline filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline timeline, boolean skipCompaction, boolean skipClustering, boolean skipInsertOverwrite) { final HoodieTimeline oriTimeline = timeline; if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ & skipCompaction) { // the compaction commit uses 'commit' as action which is tricky @@ -232,6 +235,9 @@ public class IncrementalQueryAnalyzer { if (skipClustering) { timeline = timeline.filter(instant -> !ClusteringUtils.isCompletedClusteringInstant(instant, oriTimeline)); } + if (skipInsertOverwrite) { + timeline = timeline.filter(instant -> !ClusteringUtils.isInsertOverwriteInstant(instant, oriTimeline)); + } return timeline; } @@ -254,6 +260,7 @@ public class IncrementalQueryAnalyzer { private HoodieTableMetaClient metaClient; private boolean skipCompaction = false; private boolean skipClustering = false; + private boolean skipInsertOverwrite = false; /** * Maximum number of instants to read per run. */ @@ -292,6 +299,11 @@ public class IncrementalQueryAnalyzer { return this; } + public Builder skipInsertOverwrite(boolean skipInsertOverwrite) { + this.skipInsertOverwrite = skipInsertOverwrite; + return this; + } + public Builder limit(int limit) { this.limit = limit; return this; @@ -299,7 +311,7 @@ public class IncrementalQueryAnalyzer { public IncrementalQueryAnalyzer build() { return new IncrementalQueryAnalyzer(Objects.requireNonNull(this.metaClient), this.startTime, this.endTime, - Objects.requireNonNull(this.rangeType), this.skipCompaction, this.skipClustering, this.limit); + Objects.requireNonNull(this.rangeType), this.skipCompaction, this.skipClustering, this.skipInsertOverwrite, this.limit); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index d041b6bcb8f..64eb27453b3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -314,4 +314,19 @@ public class ClusteringUtils { throw new HoodieException("Resolve replace commit metadata error for instant: " + instant, e); } } + + /** + * Returns whether the given instant {@code instant} is with insert overwrite operation. + */ + public static boolean isInsertOverwriteInstant(HoodieInstant instant, HoodieTimeline timeline) { + if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + return false; + } + try { + WriteOperationType opType = TimelineUtils.getCommitMetadata(instant, timeline).getOperationType(); + return opType.equals(WriteOperationType.INSERT_OVERWRITE) || opType.equals(WriteOperationType.INSERT_OVERWRITE_TABLE); + } catch (IOException e) { + throw new HoodieException("Resolve replace commit metadata error for instant: " + instant, e); + } + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 9a85a46e485..7565ad15300 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -329,6 +329,14 @@ public class FlinkOptions extends HoodieConfig { .withDescription("Whether to skip clustering instants to avoid reading base files of clustering operations for streaming read " + "to improve read performance."); + // this option is experimental + public static final ConfigOption<Boolean> READ_STREAMING_SKIP_INSERT_OVERWRITE = ConfigOptions + .key("read.streaming.skip_insertoverwrite") + .booleanType() + .defaultValue(false) + .withDescription("Whether to skip insert overwrite instants to avoid reading base files of insert overwrite operations for streaming read. " + + "In streaming scenarios, insert overwrite is usually used to repair data, here you can control the visibility of downstream streaming read."); + public static final String START_COMMIT_EARLIEST = "earliest"; public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions .key("read.start-commit") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 2b4dec9995c..ddd7fbbb0a8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -91,6 +91,8 @@ public class IncrementalInputSplits implements Serializable { private final boolean skipCompaction; // skip clustering private final boolean skipClustering; + // skip insert overwrite + private final boolean skipInsertOverwrite; private IncrementalInputSplits( Configuration conf, @@ -99,7 +101,8 @@ public class IncrementalInputSplits implements Serializable { long maxCompactionMemoryInBytes, @Nullable PartitionPruners.PartitionPruner partitionPruner, boolean skipCompaction, - boolean skipClustering) { + boolean skipClustering, + boolean skipInsertOverwrite) { this.conf = conf; this.path = path; this.rowType = rowType; @@ -107,6 +110,7 @@ public class IncrementalInputSplits implements Serializable { this.partitionPruner = partitionPruner; this.skipCompaction = skipCompaction; this.skipClustering = skipClustering; + this.skipInsertOverwrite = skipInsertOverwrite; } /** @@ -135,6 +139,7 @@ public class IncrementalInputSplits implements Serializable { .rangeType(InstantRange.RangeType.CLOSED_CLOSED) .skipCompaction(skipCompaction) .skipClustering(skipClustering) + .skipInsertOverwrite(skipInsertOverwrite) .build(); IncrementalQueryAnalyzer.QueryContext analyzingResult = analyzer.analyze(); @@ -241,6 +246,7 @@ public class IncrementalInputSplits implements Serializable { .rangeType(issuedOffset != null ? InstantRange.RangeType.OPEN_CLOSED : InstantRange.RangeType.CLOSED_CLOSED) .skipCompaction(skipCompaction) .skipClustering(skipClustering) + .skipInsertOverwrite(skipInsertOverwrite) .limit(OptionsResolver.getReadCommitsLimit(conf)) .build(); @@ -498,6 +504,8 @@ public class IncrementalInputSplits implements Serializable { private boolean skipCompaction = false; // skip clustering private boolean skipClustering = false; + // skip insert overwrite + private boolean skipInsertOverwrite = false; public Builder() { } @@ -537,10 +545,15 @@ public class IncrementalInputSplits implements Serializable { return this; } + public Builder skipInsertOverwrite(boolean skipInsertOverwrite) { + this.skipInsertOverwrite = skipInsertOverwrite; + return this; + } + public IncrementalInputSplits build() { return new IncrementalInputSplits( Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType), - this.maxCompactionMemoryInBytes, this.partitionPruner, this.skipCompaction, this.skipClustering); + this.maxCompactionMemoryInBytes, this.partitionPruner, this.skipCompaction, this.skipClustering, this.skipInsertOverwrite); } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index fa911cadb0e..0e3b1f0ce58 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -124,6 +124,7 @@ public class StreamReadMonitoringFunction .partitionPruner(partitionPruner) .skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT)) .skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING)) + .skipInsertOverwrite(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE)) .build(); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java index 64211608e05..c15e4c628b6 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java @@ -67,6 +67,7 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -142,18 +143,20 @@ public class TestIncrementalInputSplits extends HoodieCommonTestHarness { } @Test - void testFilterInstantsByCondition() throws IOException { - Configuration conf = TestConfigurations.getDefaultConf(basePath); + void testFilterInstantsByConditionForMOR() throws IOException { metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ); + HoodieActiveTimeline timelineMOR = metaClient.getActiveTimeline(); - HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + // commit1: delta commit HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "1"); + timelineMOR.createCompleteInstant(commit1); + // commit2: delta commit HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "2"); + // commit3: clustering + timelineMOR.createCompleteInstant(commit2); HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, "3"); - timeline.createCompleteInstant(commit1); - timeline.createCompleteInstant(commit2); - timeline.createNewInstant(commit3); - commit3 = timeline.transitionReplaceRequestedToInflight(commit3, Option.empty()); + timelineMOR.createNewInstant(commit3); + commit3 = timelineMOR.transitionReplaceRequestedToInflight(commit3, Option.empty()); HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata( new ArrayList<>(), new HashMap<>(), @@ -161,15 +164,139 @@ public class TestIncrementalInputSplits extends HoodieCommonTestHarness { WriteOperationType.CLUSTER, "", HoodieTimeline.REPLACE_COMMIT_ACTION); - timeline.transitionReplaceInflightToComplete(true, + timelineMOR.transitionReplaceInflightToComplete(true, HoodieTimeline.getReplaceCommitInflightInstant(commit3.getTimestamp()), serializeCommitMetadata(commitMetadata)); - timeline = timeline.reload(); + // commit4: insert overwrite + HoodieInstant commit4 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, "4"); + timelineMOR.createNewInstant(commit4); + commit4 = timelineMOR.transitionReplaceRequestedToInflight(commit4, Option.empty()); + commitMetadata = CommitUtils.buildMetadata( + new ArrayList<>(), + new HashMap<>(), + Option.empty(), + WriteOperationType.INSERT_OVERWRITE, + "", + HoodieTimeline.REPLACE_COMMIT_ACTION); + timelineMOR.transitionReplaceInflightToComplete(true, + HoodieTimeline.getReplaceCommitInflightInstant(commit4.getTimestamp()), + serializeCommitMetadata(commitMetadata)); + // commit5: insert overwrite table + HoodieInstant commit5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, "5"); + timelineMOR.createNewInstant(commit5); + commit5 = timelineMOR.transitionReplaceRequestedToInflight(commit5, Option.empty()); + commitMetadata = CommitUtils.buildMetadata( + new ArrayList<>(), + new HashMap<>(), + Option.empty(), + WriteOperationType.INSERT_OVERWRITE_TABLE, + "", + HoodieTimeline.REPLACE_COMMIT_ACTION); + timelineMOR.transitionReplaceInflightToComplete(true, + HoodieTimeline.getReplaceCommitInflightInstant(commit5.getTimestamp()), + serializeCommitMetadata(commitMetadata)); + // commit6: compaction + HoodieInstant commit6 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "6"); + timelineMOR.createNewInstant(commit6); + commit6 = timelineMOR.transitionCompactionRequestedToInflight(commit6); + commit6 = timelineMOR.transitionCompactionInflightToComplete(false, commit6, Option.empty()); + timelineMOR.createCompleteInstant(commit6); + timelineMOR = timelineMOR.reload(); + + // will not filter commits by default + HoodieTimeline resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timelineMOR, false, false, false); + assertEquals(6, resTimeline.getInstants().size()); + + // filter cluster commits + resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timelineMOR, false, true, false); + assertEquals(5, resTimeline.getInstants().size()); + assertFalse(resTimeline.containsInstant(commit3)); + + // filter compaction commits for mor table + resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timelineMOR, true, false, false); + assertFalse(resTimeline.containsInstant(commit6)); + + // filter insert overwriter commits + resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timelineMOR, false, false, true); + assertEquals(4, resTimeline.getInstants().size()); + assertFalse(resTimeline.containsInstant(commit4)); + assertFalse(resTimeline.containsInstant(commit5)); + } + + @Test + void testFilterInstantsByConditionForCOW() throws IOException { + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE); + HoodieActiveTimeline timelineCOW = metaClient.getActiveTimeline(); + + // commit1: commit + HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1"); + timelineCOW.createCompleteInstant(commit1); + // commit2: commit + HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "2"); + // commit3: clustering + timelineCOW.createCompleteInstant(commit2); + HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, "3"); + timelineCOW.createNewInstant(commit3); + commit3 = timelineCOW.transitionReplaceRequestedToInflight(commit3, Option.empty()); + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata( + new ArrayList<>(), + new HashMap<>(), + Option.empty(), + WriteOperationType.CLUSTER, + "", + HoodieTimeline.REPLACE_COMMIT_ACTION); + timelineCOW.transitionReplaceInflightToComplete(true, + HoodieTimeline.getReplaceCommitInflightInstant(commit3.getTimestamp()), + serializeCommitMetadata(commitMetadata)); + // commit4: insert overwrite + HoodieInstant commit4 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, "4"); + timelineCOW.createNewInstant(commit4); + commit4 = timelineCOW.transitionReplaceRequestedToInflight(commit4, Option.empty()); + commitMetadata = CommitUtils.buildMetadata( + new ArrayList<>(), + new HashMap<>(), + Option.empty(), + WriteOperationType.INSERT_OVERWRITE, + "", + HoodieTimeline.REPLACE_COMMIT_ACTION); + timelineCOW.transitionReplaceInflightToComplete(true, + HoodieTimeline.getReplaceCommitInflightInstant(commit4.getTimestamp()), + serializeCommitMetadata(commitMetadata)); + // commit5: insert overwrite table + HoodieInstant commit5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, "5"); + timelineCOW.createNewInstant(commit5); + commit5 = timelineCOW.transitionReplaceRequestedToInflight(commit5, Option.empty()); + commitMetadata = CommitUtils.buildMetadata( + new ArrayList<>(), + new HashMap<>(), + Option.empty(), + WriteOperationType.INSERT_OVERWRITE_TABLE, + "", + HoodieTimeline.REPLACE_COMMIT_ACTION); + timelineCOW.transitionReplaceInflightToComplete(true, + HoodieTimeline.getReplaceCommitInflightInstant(commit5.getTimestamp()), + serializeCommitMetadata(commitMetadata)); + + timelineCOW = timelineCOW.reload(); + + // will not filter commits by default + HoodieTimeline resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timelineCOW, false, false, false); + assertEquals(5, resTimeline.getInstants().size()); + + // filter cluster commits + resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timelineCOW, false, true, false); + assertEquals(4, resTimeline.getInstants().size()); + assertFalse(resTimeline.containsInstant(commit3)); + + // cow table skip-compact does not take effect (because if it take effect will affect normal commits) + resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timelineCOW, true, false, false); + assertEquals(5, resTimeline.getInstants().size()); - conf.set(FlinkOptions.READ_END_COMMIT, "3"); - HoodieTimeline resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timeline, false, false); - // will not filter cluster commit by default + // filter insert overwriter commits + resTimeline = IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timelineCOW, false, false, true); assertEquals(3, resTimeline.getInstants().size()); + assertFalse(resTimeline.containsInstant(commit4)); + assertFalse(resTimeline.containsInstant(commit5)); } @Test