This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e5034d35462 fix(flink): fix data loss in stream read from earliest 
(#18848)
4e5034d35462 is described below

commit 4e5034d354628bab348a55d694171632a0c7c2a0
Author: fhan <[email protected]>
AuthorDate: Mon Jun 1 17:13:07 2026 +0800

    fix(flink): fix data loss in stream read from earliest (#18848)
    
    * fix(flink): fix data loss in stream read from earliest
    * fix(flink): optimize UTs and refine de-duplicate full-table-scan timeline 
comment
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
---
 .../apache/hudi/source/IncrementalInputSplits.java |  35 ++-
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 281 +++++++++++++++++++++
 2 files changed, 313 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index bc6ebd29da21..344b24bc10c4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -184,7 +184,12 @@ public class IncrementalInputSplits implements 
Serializable {
         return Result.EMPTY;
       }
       fileInfoList = fileIndex.getFilesInPartitions();
-      List<FileSlice> allFileSlices = getFileSlices(metaClient, 
commitTimeline, readPartitions, fileInfoList, 
analyzingResult.getMaxCompletionTime(), false);
+      // Use the full commits-and-compaction timeline rather than the 
(possibly compaction-filtered)
+      // activeTimeline carried by the QueryContext. Otherwise, on a MOR table 
with
+      // 'read.streaming.skip_compaction = true', file slice boundaries would 
be wrongly
+      // computed and log files could be missed, causing data loss.
+      List<FileSlice> allFileSlices = getFileSlices(metaClient, 
getFullCommitsTimeline(metaClient),
+          readPartitions, fileInfoList, 
analyzingResult.getMaxCompletionTime(), false);
       fileSlices = fileIndex.filterFileSlices(allFileSlices);
     } else {
       if (cdcEnabled) {
@@ -217,7 +222,11 @@ public class IncrementalInputSplits implements 
Serializable {
           return Result.EMPTY;
         }
         fileInfoList = fileIndex.getFilesInPartitions();
-        List<FileSlice> allFileSlices = getFileSlices(metaClient, 
commitTimeline, readPartitions, fileInfoList, 
analyzingResult.getMaxCompletionTime(), false);
+        // Same reason as the full-table-scan branch above: build the 
FileSystemView with the
+        // complete commits-and-compaction timeline to avoid losing data when 
'skip_compaction'
+        // is enabled.
+        List<FileSlice> allFileSlices = getFileSlices(metaClient, 
getFullCommitsTimeline(metaClient),
+            readPartitions, fileInfoList, 
analyzingResult.getMaxCompletionTime(), false);
         fileSlices = fileIndex.filterFileSlices(allFileSlices);
       } else {
         fileSlices = getFileSlices(metaClient, commitTimeline, readPartitions, 
files, analyzingResult.getMaxCompletionTime(), false);
@@ -295,7 +304,10 @@ public class IncrementalInputSplits implements 
Serializable {
         log.warn("No files found for reading under path: {}", path);
         return Result.EMPTY;
       }
-      List<FileSlice> allFileSlices = getFileSlices(metaClient, 
commitTimeline, readPartitions, pathInfoList, offsetToIssue, false);
+      // Same reason as the batch full-table-scan branch:
+      // see getFullCommitsTimeline() for why a compaction-filtered timeline 
must not be used here.
+      List<FileSlice> allFileSlices = getFileSlices(metaClient, 
getFullCommitsTimeline(metaClient),
+          readPartitions, pathInfoList, offsetToIssue, false);
       List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices);
 
       List<MergeOnReadInputSplit> inputSplits = getInputSplits(fileSlices, 
metaClient, endInstant, null);
@@ -391,6 +403,23 @@ public class IncrementalInputSplits implements 
Serializable {
     return getInputSplits(fileSlices, metaClient, endInstant, instantRange);
   }
 
+  /**
+   * Returns the full commit timeline (including completed compaction 
instants) for building
+   * a {@link HoodieTableFileSystemView} during full table scan.
+   *
+   * <p>NOTE: when streaming/batch read enables {@code skip_compaction}, the 
{@code activeTimeline}
+   * carried by {@link IncrementalQueryAnalyzer.QueryContext} has already 
filtered out the
+   * compaction instants. Using such a partial timeline to construct a {@link 
HoodieTableFileSystemView}
+   * would mis-classify the file slice boundaries on a MOR table (since file 
slice boundaries
+   * are derived from compaction instants), leading to data loss when reading 
from the earliest
+   * or after start commit got archived. For full table scan we should always 
rely on the
+   * complete commits-and-compaction timeline; the {@code skip_compaction} 
semantics is preserved
+   * by the instant range filtering applied later on the generated input 
splits.
+   */
+  private static HoodieTimeline getFullCommitsTimeline(HoodieTableMetaClient 
metaClient) {
+    return 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+  }
+
   private List<FileSlice> getFileSlices(
       HoodieTableMetaClient metaClient,
       HoodieTimeline commitTimeline,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 168c1b893beb..5591d0461b6f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -28,16 +28,21 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.sink.buffer.BufferMemoryType;
 import org.apache.hudi.sink.buffer.BufferType;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
 import org.apache.hudi.table.catalog.HoodieHiveCatalog;
 import org.apache.hudi.util.StreamerUtil;
@@ -89,6 +94,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -712,6 +719,280 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
   }
 
+  /**
+   * Regression test for HUDI: data loss in stream read from earliest when
+   * {@code read.streaming.skip_compaction = true} on a MOR table with 
completed
+   * compaction commits. Covers the streaming earliest full table scan branch 
in
+   * {@link 
org.apache.hudi.source.IncrementalInputSplits#inputSplits(HoodieTableMetaClient,
 String, boolean)}.
+   *
+   * <p>Triggering condition:
+   * <ul>
+   *   <li>{@code read.start-commit = earliest} (no instant range -> full 
table scan path);</li>
+   *   <li>{@code read.streaming.skip_compaction = true} (active timeline 
filtered out compaction);</li>
+   *   <li>MOR table with at least one completed compaction commit that 
produced a
+   *       new base file from existing log files.</li>
+   * </ul>
+   *
+   * <p>Construction:
+   * <ol>
+   *   <li>Offline write {@code DATA_SET_INSERT} (8 records, ids 1..8) and then
+   *       {@code DATA_SET_UPDATE_INSERT} (8 records, where ids 1..5 update 
existing keys
+   *       and ids 9..11 are new) via {@link TestData#writeDataAsBatch}, which 
deterministically
+   *       triggers an inline compaction once {@code COMPACTION_DELTA_COMMITS 
= 1} +
+   *       {@code COMPACTION_ASYNC_ENABLED = true} are set. After this step 
the table has
+   *       both a base file (from compaction) and log files written by the 
UPDATE batch.</li>
+   *   <li>Streaming read from earliest with {@code skip_compaction = true} 
and wait until
+   *       the expected number of merged rows are received. Without the fix, 
the FS view used
+   *       in the earliest full-table-scan branch is built from a 
compaction-filtered
+   *       timeline, file slice boundaries are wrongly computed, log files are 
missed
+   *       and the read will never reach the expected row count (the test 
would time out).</li>
+   * </ol>
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testStreamReadMorTableWithCompactionFromEarliest(boolean useSourceV2) 
throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+    conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2);
+    // mandatory for writeDataAsBatch#inlineCompaction to actually run a 
compaction
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+
+    // Step 1: offline-write two batches with deterministic inline compaction 
in between.
+    TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
+    TestData.writeDataAsBatch(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+    // Step 2: streaming read from earliest with skip_compaction = true.
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .option(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name())
+        .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2)
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        .option(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST)
+        .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
+        .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
+        // skip compaction instant -> active timeline drops compaction commit
+        .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    // After the UPDATE batch, the merged result must contain all up-to-date 
records:
+    //   - 5 updated records   (id1..id5 from DATA_SET_UPDATE_INSERT)
+    //   - 3 carried-over records (id6, id7, id8 from DATA_SET_INSERT, not 
touched by UPDATE)
+    //   - 3 newly inserted records (id9, id10, id11 from 
DATA_SET_UPDATE_INSERT)
+    // i.e. 11 records in total. Without the fix the streaming read would 
never reach
+    // expectedNum = 11 and the test would time out via the CollectSink.
+    final int expectedNum = 11;
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", expectedNum);
+    assertEquals(expectedNum, rows.size(),
+        "Expect 11 up-to-date records to be visible after earliest streaming 
read"
+            + " with skip_compaction on a MOR table that has a completed 
compaction commit"
+            + ", actual rows: " + rows);
+  }
+
+  /**
+   * Regression test for HUDI: data loss in batch read from earliest when
+   * {@code read.streaming.skip_compaction = true} on a MOR table with 
completed
+   * compaction commits. Covers the batch full-table-scan branch in
+   * {@link 
org.apache.hudi.source.IncrementalInputSplits#inputSplits(HoodieTableMetaClient,
 boolean)}.
+   *
+   * <p>This complements {@link 
#testStreamReadMorTableWithCompactionFromEarliest(boolean)}
+   * which only exercises the streaming code path. Without the fix, building 
the
+   * {@link org.apache.hudi.common.table.view.HoodieTableFileSystemView} with a
+   * compaction-filtered timeline would mis-classify file slice boundaries and
+   * lose log files.
+   */
+  @Test
+  void testBatchReadMorTableWithCompactionFromEarliest() throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+    conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2);
+    // mandatory for writeDataAsBatch#inlineCompaction to actually run a 
compaction
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+
+    // Offline-write two batches against overlapping record keys, the 2nd 
write triggers
+    // an inline compaction that merges existing log files into a new base 
file - exactly
+    // the scenario that exposes the buggy file-slice classification when 
skip_compaction
+    // is enabled.
+    TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
+    TestData.writeDataAsBatch(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .option(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name())
+        .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2)
+        .option(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST)
+        // skip compaction instant -> active timeline drops compaction commit
+        .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
+        .end();
+    batchTableEnv.executeSql(hoodieTableDDL);
+
+    List<Row> result = CollectionUtil.iteratorToList(
+        batchTableEnv.executeSql("select * from t1").collect());
+    // After update, the merged result must contain all up-to-date records:
+    //   - 5 updated records   (id1..id5 from DATA_SET_UPDATE_INSERT)
+    //   - 3 carried-over records (id6, id7, id8 from DATA_SET_INSERT, not 
touched by UPDATE)
+    //   - 3 newly inserted records (id9, id10, id11 from 
DATA_SET_UPDATE_INSERT)
+    // i.e. 11 records in total. Without the fix, log files belonging to the 
file slice prior
+    // to the inline compaction would be silently dropped by the file system 
view because
+    // the active timeline filtered out the compaction commit, and the result 
size would be
+    // smaller than 11.
+    assertEquals(11, result.size(),
+        "Expect all up-to-date records to be visible after earliest + 
skip_compaction batch read"
+            + ", actual rows: " + result);
+  }
+
+  /**
+   * Regression test for HUDI: data loss when the start commit has been 
archived
+   * and {@code read.streaming.skip_compaction = true} on a MOR table.
+   * Covers the batch "fallback to full table scan" branch in
+   * {@link 
org.apache.hudi.source.IncrementalInputSplits#inputSplits(HoodieTableMetaClient,
 boolean)}
+   * which is reached when {@code hasArchivedInstants == true}.
+   *
+   * <p>Construction:
+   * <ol>
+   *   <li>Write 10 delta-commit batches of {@code (id1,id2), (id3,id4), ...} 
on a MOR table
+   *       so that each batch only inserts new keys (clear, predictable 
per-commit semantics).</li>
+   *   <li>Trigger one completed compaction commit by issuing an extra UPDATE 
batch on
+   *       {@code id1..id4} with {@code COMPACTION_DELTA_COMMITS = 1} via
+   *       {@link TestData#writeDataAsBatch} (which explicitly calls {@code 
inlineCompaction()}).
+   *       This creates exactly the file-slice boundary that the buggy FS view 
would
+   *       mis-classify.</li>
+   *   <li>Pick the LAST archived delta-commit as {@code read.start-commit} 
(filtered by
+   *       {@code action = deltacommit} to exclude any archived compaction 
{@code commit}
+   *       instants). This is deterministic regardless of how many delta 
commits were
+   *       archived by the cleaner+archiver and routes the reader through the
+   *       "archived start commit -> fullTableScan" branch.</li>
+   *   <li>Read with {@code skip_compaction = true} and assert on the SET of 
record-keys
+   *       in the result (not just on count). The expected key set is derived 
dynamically
+   *       from the timeline: every delta_commit whose completion time is 
&gt;= the chosen
+   *       start_commit contributes its written ids, plus id1..id4 from the 
UPDATE batch
+   *       are always present because the UPDATE is the latest write. Without 
the fix,
+   *       log files of the file slice that straddles the compaction commit 
are silently
+   *       dropped, so some of these ids would be missing.</li>
+   * </ol>
+   */
+  @Test
+  void testBatchReadMorTableWithCompactionStartCommitArchived() throws 
Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
+    conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+    conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2);
+    // aggressive archival to force older instants out of the active timeline
+    conf.set(FlinkOptions.ARCHIVE_MIN_COMMITS, 4);
+    conf.set(FlinkOptions.ARCHIVE_MAX_COMMITS, 5);
+    conf.set(FlinkOptions.CLEAN_RETAIN_COMMITS, 3);
+    conf.setString("hoodie.commits.archival.batch", "1");
+
+    // Step 1: write 10 batches of 2 new records each -> 10 delta_commit 
instants, 20 distinct keys.
+    for (int i = 0; i < 20; i += 2) {
+      List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
+      TestData.writeData(dataset, conf);
+    }
+
+    // Step 2: trigger at least one completed compaction commit by issuing one 
more delta_commit
+    // that UPDATES the very first record keys (id1..id4) and enabling 
COMPACTION_DELTA_COMMITS=1.
+    // The update writes new log files for the file group that contains 
id1..id4, and the inline
+    // compaction merges them into a new base file -> a real compaction 
file-slice boundary.
+    // NOTE: use writeDataAsBatch (which explicitly calls inlineCompaction()), 
since the plain
+    // writeData helper does not run the compaction even with 
COMPACTION_DELTA_COMMITS=1.
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2, 3, 4), conf);
+
+    // Step 3: list the full timeline in one shot to map start_commit -> 
expected id set.
+    // Delta_commit instants are strictly monotonically increasing, so the 
sorted list of all
+    // delta_commits across active + archived timelines gives a 1:1 mapping to 
the 10 batches
+    // written in Step 1: the k-th delta_commit wrote id_{2k+1} and id_{2k+2}.
+    HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
+        new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new 
Configuration())),
+        tempFile.getAbsolutePath());
+    // Use the merged (archived + active) timeline to capture all 
delta_commits,
+    // even those that may have been archived by the aggressive archival 
settings.
+    List<String> batchInstantTimes = TimelineUtils.getTimeline(metaClient, 
true)
+        .getCommitsTimeline().filterCompletedInstants()
+        .filter(instant -> 
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()))
+        
.getInstantsAsStream().map(HoodieInstant::requestedTime).collect(Collectors.toList());
+    // Step 1 produced exactly 10 delta_commits; the 11th (if present) is from 
Step 2 UPDATE.
+    // Keep only the first 10 to build the batch-index -> id mapping.
+    assertTrue(batchInstantTimes.size() >= 10,
+        "Expected at least 10 delta_commits from Step 1, got " + 
batchInstantTimes.size());
+    batchInstantTimes = batchInstantTimes.subList(0, 10);
+
+    // Step 4: pick the LAST archived delta_commit that belongs to Step 1's 
batches as
+    // start commit. This avoids any drift caused by archival ordering or by 
compaction
+    // `commit` instants being interleaved with delta_commits in the archived 
timeline,
+    // and also ignores the Step 2 UPDATE batch in case it also got archived.
+    Set<String> step1InstantTimeSet = new TreeSet<>(batchInstantTimes);
+    List<HoodieInstant> archivedDeltaCommits = 
metaClient.getArchivedTimeline().getCommitsTimeline()
+        .filterCompletedInstants()
+        .filter(instant -> 
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()))
+        .filter(instant -> 
step1InstantTimeSet.contains(instant.requestedTime()))
+        .getInstants();
+    // make sure archival actually happened on Step 1's batches, otherwise the 
test premise
+    // (the reader hits the archived start commit + fullTableScan branch) does 
not hold.
+    assertTrue(!archivedDeltaCommits.isEmpty(),
+        "archival did not happen as expected on Step 1's batches, archived 
delta commits = "
+            + archivedDeltaCommits + ", Step 1 batch instant times = " + 
batchInstantTimes);
+    HoodieInstant startInstant = 
archivedDeltaCommits.get(archivedDeltaCommits.size() - 1);
+    String archivedStartInstant = startInstant.requestedTime();
+
+    // The expected key set: every Step 1 batch whose instant time is >= 
start_commit contributes
+    // its 2 ids; plus id1..id4 from the Step 2 UPDATE batch (always the 
latest write, never
+    // excluded since its completion time is the largest).
+    int firstIncludedBatchIdx = 
batchInstantTimes.indexOf(archivedStartInstant);
+    assertTrue(firstIncludedBatchIdx >= 0,
+        "chosen start_commit " + archivedStartInstant + " is not one of the 
Step 1 batch instant times " + batchInstantTimes);
+    Set<String> expectedIds = new TreeSet<>();
+    for (int i = firstIncludedBatchIdx; i < batchInstantTimes.size(); i++) {
+      expectedIds.add("id" + (2 * i + 1));
+      expectedIds.add("id" + (2 * i + 2));
+    }
+    // UPDATE batch ids — always present in the merged view because the UPDATE 
is the latest write.
+    expectedIds.add("id1");
+    expectedIds.add("id2");
+    expectedIds.add("id3");
+    expectedIds.add("id4");
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .option(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name())
+        .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2)
+        .option(FlinkOptions.READ_START_COMMIT, archivedStartInstant)
+        .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
+        .end();
+    batchTableEnv.executeSql(hoodieTableDDL);
+
+    List<Row> result = CollectionUtil.iteratorToList(
+        batchTableEnv.executeSql("select uuid from t1").collect());
+    Set<String> actualIds = new TreeSet<>();
+    for (Row r : result) {
+      actualIds.add(r.getField(0).toString());
+    }
+    // Without the fix, the FS view used to construct file slices for the 
fallback full-table-scan
+    // branch is built from a compaction-filtered timeline, so log files of 
the file slice that
+    // straddles the compaction commit are silently dropped and some ids would 
be missing from
+    // {@code actualIds}. With the fix, every expected id must be present.
+    assertEquals(expectedIds, actualIds,
+        "Expected id set " + expectedIds + " but got " + actualIds
+            + " when reading from archived start commit " + 
archivedStartInstant
+            + " with skip_compaction = true on a MOR table that has a 
completed compaction commit");
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   void testStreamReadMorTableWithBucketIndex(boolean partitioned) throws 
Exception {

Reply via email to