This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f117f22c23 [HUDI-7230] Flink stream read supports skipping insert 
overwrite instant (#10328)
6f117f22c23 is described below

commit 6f117f22c233e7768b63d7e08bd223f8d9cf80d7
Author: zhuanshenbsj1 <34104400+zhuanshenb...@users.noreply.github.com>
AuthorDate: Mon Apr 8 17:22:22 2024 +0800

    [HUDI-7230] Flink stream read supports skipping insert overwrite instant 
(#10328)
---
 .../table/read/IncrementalQueryAnalyzer.java       |  20 ++-
 .../apache/hudi/common/util/ClusteringUtils.java   |  15 ++
 .../apache/hudi/configuration/FlinkOptions.java    |   8 ++
 .../apache/hudi/source/IncrementalInputSplits.java |  17 ++-
 .../hudi/source/StreamReadMonitoringFunction.java  |   1 +
 .../hudi/source/TestIncrementalInputSplits.java    | 151 +++++++++++++++++++--
 6 files changed, 194 insertions(+), 18 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
index 3f0eb32c7e5..ca8ae575898 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
@@ -111,6 +111,7 @@ public class IncrementalQueryAnalyzer {
   private final InstantRange.RangeType rangeType;
   private final boolean skipCompaction;
   private final boolean skipClustering;
+  private final boolean skipInsertOverwrite;
   private final int limit;
 
   private IncrementalQueryAnalyzer(
@@ -120,6 +121,7 @@ public class IncrementalQueryAnalyzer {
       InstantRange.RangeType rangeType,
       boolean skipCompaction,
       boolean skipClustering,
+      boolean skipInsertOverwrite,
       int limit) {
     this.metaClient = metaClient;
     this.startTime = Option.ofNullable(startTime);
@@ -127,6 +129,7 @@ public class IncrementalQueryAnalyzer {
     this.rangeType = rangeType;
     this.skipCompaction = skipCompaction;
     this.skipClustering = skipClustering;
+    this.skipInsertOverwrite = skipInsertOverwrite;
     this.limit = limit;
   }
 
@@ -206,13 +209,13 @@ public class IncrementalQueryAnalyzer {
 
   private HoodieTimeline getFilteredTimeline(HoodieTableMetaClient metaClient) 
{
     HoodieTimeline timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
-    return filterInstantsAsPerUserConfigs(metaClient, timeline, 
this.skipCompaction, this.skipClustering);
+    return filterInstantsAsPerUserConfigs(metaClient, timeline, 
this.skipCompaction, this.skipClustering, this.skipInsertOverwrite);
   }
 
   private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient 
metaClient, String startInstant) {
     HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant, false);
     HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
-    return filterInstantsAsPerUserConfigs(metaClient, 
archivedCompleteTimeline, this.skipCompaction, this.skipClustering);
+    return filterInstantsAsPerUserConfigs(metaClient, 
archivedCompleteTimeline, this.skipCompaction, this.skipClustering, 
this.skipInsertOverwrite);
   }
 
   /**
@@ -223,7 +226,7 @@ public class IncrementalQueryAnalyzer {
    * @return the filtered timeline
    */
   @VisibleForTesting
-  public static HoodieTimeline 
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline 
timeline, boolean skipCompaction, boolean skipClustering) {
+  public static HoodieTimeline 
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline 
timeline, boolean skipCompaction, boolean skipClustering, boolean 
skipInsertOverwrite) {
     final HoodieTimeline oriTimeline = timeline;
     if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ & 
skipCompaction) {
       // the compaction commit uses 'commit' as action which is tricky
@@ -232,6 +235,9 @@ public class IncrementalQueryAnalyzer {
     if (skipClustering) {
       timeline = timeline.filter(instant -> 
!ClusteringUtils.isCompletedClusteringInstant(instant, oriTimeline));
     }
+    if (skipInsertOverwrite) {
+      timeline = timeline.filter(instant -> 
!ClusteringUtils.isInsertOverwriteInstant(instant, oriTimeline));
+    }
     return timeline;
   }
 
@@ -254,6 +260,7 @@ public class IncrementalQueryAnalyzer {
     private HoodieTableMetaClient metaClient;
     private boolean skipCompaction = false;
     private boolean skipClustering = false;
+    private boolean skipInsertOverwrite = false;
     /**
      * Maximum number of instants to read per run.
      */
@@ -292,6 +299,11 @@ public class IncrementalQueryAnalyzer {
       return this;
     }
 
+    public Builder skipInsertOverwrite(boolean skipInsertOverwrite) {
+      this.skipInsertOverwrite = skipInsertOverwrite;
+      return this;
+    }
+
     public Builder limit(int limit) {
       this.limit = limit;
       return this;
@@ -299,7 +311,7 @@ public class IncrementalQueryAnalyzer {
 
     public IncrementalQueryAnalyzer build() {
       return new 
IncrementalQueryAnalyzer(Objects.requireNonNull(this.metaClient), 
this.startTime, this.endTime,
-          Objects.requireNonNull(this.rangeType), this.skipCompaction, 
this.skipClustering, this.limit);
+          Objects.requireNonNull(this.rangeType), this.skipCompaction, 
this.skipClustering, this.skipInsertOverwrite, this.limit);
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index d041b6bcb8f..64eb27453b3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -314,4 +314,19 @@ public class ClusteringUtils {
       throw new HoodieException("Resolve replace commit metadata error for 
instant: " + instant, e);
     }
   }
+
+  /**
+   * Returns whether the given instant {@code instant} is with insert 
overwrite operation.
+   */
+  public static boolean isInsertOverwriteInstant(HoodieInstant instant, 
HoodieTimeline timeline) {
+    if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      return false;
+    }
+    try {
+      WriteOperationType opType = TimelineUtils.getCommitMetadata(instant, 
timeline).getOperationType();
+      return opType.equals(WriteOperationType.INSERT_OVERWRITE) || 
opType.equals(WriteOperationType.INSERT_OVERWRITE_TABLE);
+    } catch (IOException e) {
+      throw new HoodieException("Resolve replace commit metadata error for 
instant: " + instant, e);
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 9a85a46e485..7565ad15300 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -329,6 +329,14 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("Whether to skip clustering instants to avoid reading 
base files of clustering operations for streaming read "
           + "to improve read performance.");
 
+  // this option is experimental
+  public static final ConfigOption<Boolean> 
READ_STREAMING_SKIP_INSERT_OVERWRITE = ConfigOptions
+      .key("read.streaming.skip_insertoverwrite")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Whether to skip insert overwrite instants to avoid 
reading base files of insert overwrite operations for streaming read. "
+          + "In streaming scenarios, insert overwrite is usually used to 
repair data, here you can control the visibility of downstream streaming 
read.");
+
   public static final String START_COMMIT_EARLIEST = "earliest";
   public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
       .key("read.start-commit")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 2b4dec9995c..ddd7fbbb0a8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -91,6 +91,8 @@ public class IncrementalInputSplits implements Serializable {
   private final boolean skipCompaction;
   // skip clustering
   private final boolean skipClustering;
+  // skip insert overwrite
+  private final boolean skipInsertOverwrite;
 
   private IncrementalInputSplits(
       Configuration conf,
@@ -99,7 +101,8 @@ public class IncrementalInputSplits implements Serializable {
       long maxCompactionMemoryInBytes,
       @Nullable PartitionPruners.PartitionPruner partitionPruner,
       boolean skipCompaction,
-      boolean skipClustering) {
+      boolean skipClustering,
+      boolean skipInsertOverwrite) {
     this.conf = conf;
     this.path = path;
     this.rowType = rowType;
@@ -107,6 +110,7 @@ public class IncrementalInputSplits implements Serializable 
{
     this.partitionPruner = partitionPruner;
     this.skipCompaction = skipCompaction;
     this.skipClustering = skipClustering;
+    this.skipInsertOverwrite = skipInsertOverwrite;
   }
 
   /**
@@ -135,6 +139,7 @@ public class IncrementalInputSplits implements Serializable 
{
         .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
         .skipCompaction(skipCompaction)
         .skipClustering(skipClustering)
+        .skipInsertOverwrite(skipInsertOverwrite)
         .build();
 
     IncrementalQueryAnalyzer.QueryContext analyzingResult = analyzer.analyze();
@@ -241,6 +246,7 @@ public class IncrementalInputSplits implements Serializable 
{
         .rangeType(issuedOffset != null ? InstantRange.RangeType.OPEN_CLOSED : 
InstantRange.RangeType.CLOSED_CLOSED)
         .skipCompaction(skipCompaction)
         .skipClustering(skipClustering)
+        .skipInsertOverwrite(skipInsertOverwrite)
         .limit(OptionsResolver.getReadCommitsLimit(conf))
         .build();
 
@@ -498,6 +504,8 @@ public class IncrementalInputSplits implements Serializable 
{
     private boolean skipCompaction = false;
     // skip clustering
     private boolean skipClustering = false;
+    // skip insert overwrite
+    private boolean skipInsertOverwrite = false;
 
     public Builder() {
     }
@@ -537,10 +545,15 @@ public class IncrementalInputSplits implements 
Serializable {
       return this;
     }
 
+    public Builder skipInsertOverwrite(boolean skipInsertOverwrite) {
+      this.skipInsertOverwrite = skipInsertOverwrite;
+      return this;
+    }
+
     public IncrementalInputSplits build() {
       return new IncrementalInputSplits(
           Objects.requireNonNull(this.conf), 
Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType),
-          this.maxCompactionMemoryInBytes, this.partitionPruner, 
this.skipCompaction, this.skipClustering);
+          this.maxCompactionMemoryInBytes, this.partitionPruner, 
this.skipCompaction, this.skipClustering, this.skipInsertOverwrite);
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index fa911cadb0e..0e3b1f0ce58 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -124,6 +124,7 @@ public class StreamReadMonitoringFunction
         .partitionPruner(partitionPruner)
         
.skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
         
.skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
+        
.skipInsertOverwrite(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE))
         .build();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index 64211608e05..c15e4c628b6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -67,6 +67,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertIterableEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -142,18 +143,20 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
   }
 
   @Test
-  void testFilterInstantsByCondition() throws IOException {
-    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+  void testFilterInstantsByConditionForMOR() throws IOException {
     metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+    HoodieActiveTimeline timelineMOR = metaClient.getActiveTimeline();
 
-    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    // commit1: delta commit
     HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "1");
+    timelineMOR.createCompleteInstant(commit1);
+    // commit2: delta commit
     HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "2");
+    // commit3: clustering
+    timelineMOR.createCompleteInstant(commit2);
     HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "3");
-    timeline.createCompleteInstant(commit1);
-    timeline.createCompleteInstant(commit2);
-    timeline.createNewInstant(commit3);
-    commit3 = timeline.transitionReplaceRequestedToInflight(commit3, 
Option.empty());
+    timelineMOR.createNewInstant(commit3);
+    commit3 = timelineMOR.transitionReplaceRequestedToInflight(commit3, 
Option.empty());
     HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(
             new ArrayList<>(),
             new HashMap<>(),
@@ -161,15 +164,139 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
             WriteOperationType.CLUSTER,
             "",
             HoodieTimeline.REPLACE_COMMIT_ACTION);
-    timeline.transitionReplaceInflightToComplete(true,
+    timelineMOR.transitionReplaceInflightToComplete(true,
         HoodieTimeline.getReplaceCommitInflightInstant(commit3.getTimestamp()),
         serializeCommitMetadata(commitMetadata));
-    timeline = timeline.reload();
+    // commit4: insert overwrite
+    HoodieInstant commit4 = new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "4");
+    timelineMOR.createNewInstant(commit4);
+    commit4 = timelineMOR.transitionReplaceRequestedToInflight(commit4, 
Option.empty());
+    commitMetadata = CommitUtils.buildMetadata(
+            new ArrayList<>(),
+            new HashMap<>(),
+            Option.empty(),
+            WriteOperationType.INSERT_OVERWRITE,
+            "",
+            HoodieTimeline.REPLACE_COMMIT_ACTION);
+    timelineMOR.transitionReplaceInflightToComplete(true,
+            
HoodieTimeline.getReplaceCommitInflightInstant(commit4.getTimestamp()),
+            serializeCommitMetadata(commitMetadata));
+    // commit5: insert overwrite table
+    HoodieInstant commit5 = new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "5");
+    timelineMOR.createNewInstant(commit5);
+    commit5 = timelineMOR.transitionReplaceRequestedToInflight(commit5, 
Option.empty());
+    commitMetadata = CommitUtils.buildMetadata(
+            new ArrayList<>(),
+            new HashMap<>(),
+            Option.empty(),
+            WriteOperationType.INSERT_OVERWRITE_TABLE,
+            "",
+            HoodieTimeline.REPLACE_COMMIT_ACTION);
+    timelineMOR.transitionReplaceInflightToComplete(true,
+            
HoodieTimeline.getReplaceCommitInflightInstant(commit5.getTimestamp()),
+            serializeCommitMetadata(commitMetadata));
+    // commit6:  compaction
+    HoodieInstant commit6 = new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, "6");
+    timelineMOR.createNewInstant(commit6);
+    commit6 = timelineMOR.transitionCompactionRequestedToInflight(commit6);
+    commit6 = timelineMOR.transitionCompactionInflightToComplete(false, 
commit6, Option.empty());
+    timelineMOR.createCompleteInstant(commit6);
+    timelineMOR = timelineMOR.reload();
+
+    // will not filter commits by default
+    HoodieTimeline resTimeline = 
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, 
timelineMOR, false, false, false);
+    assertEquals(6, resTimeline.getInstants().size());
+
+    // filter cluster commits
+    resTimeline = 
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, 
timelineMOR, false, true, false);
+    assertEquals(5, resTimeline.getInstants().size());
+    assertFalse(resTimeline.containsInstant(commit3));
+
+    // filter compaction commits for mor table
+    resTimeline = 
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, 
timelineMOR, true, false, false);
+    assertFalse(resTimeline.containsInstant(commit6));
+
+    // filter insert overwriter commits
+    resTimeline = 
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, 
timelineMOR, false, false, true);
+    assertEquals(4, resTimeline.getInstants().size());
+    assertFalse(resTimeline.containsInstant(commit4));
+    assertFalse(resTimeline.containsInstant(commit5));
+  }
+
+  @Test
+  void testFilterInstantsByConditionForCOW() throws IOException {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    HoodieActiveTimeline timelineCOW = metaClient.getActiveTimeline();
+
+    // commit1: commit
+    HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "1");
+    timelineCOW.createCompleteInstant(commit1);
+    // commit2: commit
+    HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "2");
+    // commit3: clustering
+    timelineCOW.createCompleteInstant(commit2);
+    HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "3");
+    timelineCOW.createNewInstant(commit3);
+    commit3 = timelineCOW.transitionReplaceRequestedToInflight(commit3, 
Option.empty());
+    HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(
+            new ArrayList<>(),
+            new HashMap<>(),
+            Option.empty(),
+            WriteOperationType.CLUSTER,
+            "",
+            HoodieTimeline.REPLACE_COMMIT_ACTION);
+    timelineCOW.transitionReplaceInflightToComplete(true,
+            
HoodieTimeline.getReplaceCommitInflightInstant(commit3.getTimestamp()),
+            serializeCommitMetadata(commitMetadata));
+    // commit4: insert overwrite
+    HoodieInstant commit4 = new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "4");
+    timelineCOW.createNewInstant(commit4);
+    commit4 = timelineCOW.transitionReplaceRequestedToInflight(commit4, 
Option.empty());
+    commitMetadata = CommitUtils.buildMetadata(
+            new ArrayList<>(),
+            new HashMap<>(),
+            Option.empty(),
+            WriteOperationType.INSERT_OVERWRITE,
+            "",
+            HoodieTimeline.REPLACE_COMMIT_ACTION);
+    timelineCOW.transitionReplaceInflightToComplete(true,
+            
HoodieTimeline.getReplaceCommitInflightInstant(commit4.getTimestamp()),
+            serializeCommitMetadata(commitMetadata));
+    // commit5: insert overwrite table
+    HoodieInstant commit5 = new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "5");
+    timelineCOW.createNewInstant(commit5);
+    commit5 = timelineCOW.transitionReplaceRequestedToInflight(commit5, 
Option.empty());
+    commitMetadata = CommitUtils.buildMetadata(
+            new ArrayList<>(),
+            new HashMap<>(),
+            Option.empty(),
+            WriteOperationType.INSERT_OVERWRITE_TABLE,
+            "",
+            HoodieTimeline.REPLACE_COMMIT_ACTION);
+    timelineCOW.transitionReplaceInflightToComplete(true,
+            
HoodieTimeline.getReplaceCommitInflightInstant(commit5.getTimestamp()),
+            serializeCommitMetadata(commitMetadata));
+
+    timelineCOW = timelineCOW.reload();
+
+    // will not filter commits by default
+    HoodieTimeline resTimeline = 
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, 
timelineCOW, false, false, false);
+    assertEquals(5, resTimeline.getInstants().size());
+
+    // filter cluster commits
+    resTimeline = 
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, 
timelineCOW, false, true, false);
+    assertEquals(4, resTimeline.getInstants().size());
+    assertFalse(resTimeline.containsInstant(commit3));
+
+    // cow table skip-compact does not take effect (because if it take effect 
will affect normal commits)
+    resTimeline = 
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, 
timelineCOW, true, false, false);
+    assertEquals(5, resTimeline.getInstants().size());
 
-    conf.set(FlinkOptions.READ_END_COMMIT, "3");
-    HoodieTimeline resTimeline = 
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timeline, 
false, false);
-    // will not filter cluster commit by default
+    // filter insert overwriter commits
+    resTimeline = 
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, 
timelineCOW, false, false, true);
     assertEquals(3, resTimeline.getInstants().size());
+    assertFalse(resTimeline.containsInstant(commit4));
+    assertFalse(resTimeline.containsInstant(commit5));
   }
 
   @Test

Reply via email to