yihua opened a new pull request, #18949: URL: https://github.com/apache/hudi/pull/18949
### Change Logs S3/GCS cloud-object incremental sources can silently drop records whenever a previous batch persisted a `commit#fileKey` mid-commit-pagination checkpoint (i.e., the prior batch hit `sourceLimit` before exhausting the start commit's files). Files in the start commit after the checkpoint key become unreachable, and the persisted checkpoint advances past them as a bare instant. **Root cause.** `QueryRunner.runIncrementalQuery` passes `queryInfo.getStartInstant()` as the Spark `START_COMMIT`. The Spark incremental relation filters the source timeline via `findInstantsInRange`, which is `(start, end]` (start-exclusive), so the start commit is dropped from the scan. The downstream `(commit_time || object_key) > 'commit#fileKey'` filter in `IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit` then matches nothing in the start commit, the empty-batch branch fires, and the new checkpoint is emitted as `endInstant` with no `#fileKey` suffix. The next batch resumes past the gap. **Fix.** Pass `queryInfo.getPreviousInstant()` so the resulting scan range `(previousInstant, end]` includes the desired start commit while preserving start-exclusive relation semantics. This is required for cloud-object sources whose `commit#fileKey` pagination depends on re-scanning the start commit to find files past the persisted key. ### Impact - `S3EventsHoodieIncrSource` / `GcsEventsHoodieIncrSource` - Triggered when at least one upstream source-events commit exceeds `hoodie.deltastreamer.read.source.limit` Common triggers: cold-start backfills against a source table with a big initial commit, bursty event writers, low `sourceLimit` overrides. Steady-state streams whose upstream commits fit within `sourceLimit` are unaffected. `HoodieIncrSource` (non-cloud) does not go through `QueryRunner` and is unaffected. The behavior change is contained to (a) the `commit#fileKey` mid-commit-pagination case (now correctly re-scans the start commit) and (b) bare-commit resumption (re-scans the start commit; safe since bare-commit is only emitted when the start commit had 0 matching rows post-prefix-filter, so re-scanning yields the same 0 rows on immutable source data). ### Risk level low ### Documentation Update None. ### Contributor's checklist - [x] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [x] Change Logs and Impact were stated clearly - [x] Adequate tests were added. New `testRealQueryRunnerResumesMidCommitPagination` in both `TestS3EventsHoodieIncrSource` and `TestGcsEventsHoodieIncrSource` exercises a real `QueryRunner` against an on-disk Hudi events meta-table, resuming from a mid-commit `commit#fileKey` checkpoint with `sourceLimit` smaller than the remaining files. Asserts both the next persisted checkpoint and the exact files passed downstream (via captor on `loadAsDataset`). The existing tests mocked `QueryRunner.run()` to return inputDs unfiltered for incremental queries and could not catch a `START_COMMIT`-handling regression. - [ ] CI passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
