gengliangwang opened a new pull request, #55636:
URL: https://github.com/apache/spark/pull/55636

   ### What changes were proposed in this pull request?
   
   This PR implements row-level CDC post-processing (carry-over removal and 
update detection) for DSv2 streaming reads. Previously, streaming `changes()` 
rejected any post-processing with a blanket 
`INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` error.
   
   The batch path (added in #55508 and #55583) uses a Catalyst `Window` keyed 
by `(rowId, _commit_version)`, which `UnsupportedOperationChecker` rejects on 
streaming queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`). The streaming 
rewrite in `ResolveChangelogTable` now expresses the same logic with 
streaming-allowed primitives:
   
   ```
   EventTimeWatermark(_commit_timestamp, 0s)
     -> Aggregate keyed by (rowId..., _commit_version, _commit_timestamp)
          (count_if delete/insert, [min/max/count rowVersion,] 
collect_list(struct(*)))
     -> [Filter on the carry-over predicate]
     -> Generate(Inline(events))
     -> [Project relabeling _change_type for delete+insert pairs]
     -> Project dropping __spark_cdc_* helpers
   ```
   
   Including `_commit_timestamp` in the grouping keys is required to satisfy 
the Append-mode streaming aggregation contract (the watermark attribute must 
appear among the grouping expressions). By CDC convention all rows in a single 
commit share `_commit_timestamp`, so this is a no-op semantically relative to 
the batch `(rowId, _commit_version)` grouping.
   
   `deduplicationMode = netChanges` is still rejected -- net change computation 
partitions by `rowId` alone and reasons over the entire requested range, which 
is fundamentally cross-batch. The existing error class 
`INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` is replaced with 
the more specific `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED`, 
which now names the offending option and points users at the supported 
streaming alternatives.
   
   Doc updates:
   - `Changelog.java` clarifies that all rows of a single `_commit_version` 
must share `_commit_timestamp`, and that streaming reads expect non-decreasing 
`_commit_timestamp` across micro-batches.
   - `Changelog.java` notes that `containsIntermediateChanges()` is 
range-scoped, hence the streaming limitation for `netChanges`.
   - `DataStreamReader.changes()` Scaladoc lists the `netChanges` streaming 
limitation.
   
   ### Why are the changes needed?
   
   Without this PR, any streaming CDC read against a connector that emits CoW 
carry-over pairs (`containsCarryoverRows = true`) or represents updates as raw 
delete+insert (`representsUpdateAsDeleteAndInsert = true`) raises an analysis 
error, forcing users to fall back to batch reads. The batch-only restriction is 
unnecessary for these passes -- they don't need cross-version state -- and it 
surprises users since the same options work on batch reads.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes.
   - Streaming `spark.readStream.changes(...)` now supports `computeUpdates = 
true` and `deduplicationMode = dropCarryovers`. Previously these threw 
`INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED`.
   - The error class 
`INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` is renamed to 
`INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED` with a more specific 
message. The new error fires only for `deduplicationMode = netChanges` on 
streaming reads.
   - `DataStreamReader.changes()` Scaladoc is updated accordingly.
   - `Changelog.java` Scaladoc clarifies the `_commit_timestamp` contract for 
streaming.
   
   ### How was this patch tested?
   
   86 tests across 4 CDC suites (all passing):
   
   - `ResolveChangelogTableStreamingPostProcessingSuite` (new, 5 tests) -- 
plan-shape assertions covering carry-over only, update detection only, both 
fused, and the no-rewrite pass-through cases. Verifies the `EventTimeWatermark` 
+ `Aggregate` + `Generate(Inline)` rewrite shape.
   - `ChangelogResolutionSuite` -- the two existing streaming throw-tests are 
flipped to plan-shape assertions; a new test covers the `netChanges` streaming 
throw.
   - `ResolveChangelogTablePostProcessingSuite` -- the existing streaming throw 
test is updated to cover the `netChanges`-only case.
   - `ChangelogEndToEndSuite` -- three new streaming end-to-end tests using 
`InMemoryChangelogCatalog`: carry-over removal drops CoW pairs, update 
detection relabels delete+insert as update, and `netChanges` throws.
   
   Also confirmed `UnsupportedOperationsSuite` (216 tests) still passes -- the 
rewritten plan does not contain `Window` or any other streaming-rejected 
operator.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (claude-opus-4-7)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to