SanJSp opened a new pull request, #55508: URL: https://github.com/apache/spark/pull/55508
This is **PR 2 of a split** of #55426 (see the [split suggestion](https://github.com/apache/spark/pull/55426#issuecomment-4292375876) for the full plan). Independent of PR 1 — can merge in any order. Introduce the analyzer rule that post-processes a resolved `DataSourceV2Relation(ChangelogTable)` to inject carry-over removal and/or update detection, fused into a single pass over a `(rowId, _commit_version)`-partitioned Window. Also includes an explicit rejection path for streaming CDC reads that would require post-processing, to prevent silent wrong results. - `ResolveChangelogTable` analyzer rule: - **Batch**: applies the requested post-processing transformations. Carry-over removal is a `Filter` on the Window (drops CoW pairs where `min(rowVersion) == max(rowVersion)`); update detection is a `CASE WHEN` over delete/insert counts (relabels pairs as `update_preimage` / `update_postimage`). The two passes are fused into a single Window. - **Streaming**: throws `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` when the requested options would need post-processing. Streams that don't need post-processing pass through unchanged. Actual streaming support is scoped to a follow-up PR. - **Net changes**: throws `INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED` for both batch and streaming. Actual implementation is scoped to a follow-up PR. - Option validation: throws `INVALID_CDC_OPTION.UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL` when `computeUpdates = true` is combined with a carry-over-surfacing connector and `deduplicationMode = none`, which would silently misclassify carry-overs as updates. - Runtime guard: the generated plan raises `INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION` when the connector emits more than one delete or insert for the same `(rowId, _commit_version)` partition, violating the `Changelog` contract. - `Analyzer`: register the rule after `ResolveRelations`. - `InMemoryChangelogCatalog`: `ChangelogProperties` extension so tests can configure post-processing scenarios without a real connector. ### Why are the changes needed? Currently `CHANGES FROM VERSION ... WITH (deduplicationMode = ..., computeUpdates = ...)` parses the options but they are silently ignored — connector output is returned raw. This PR wires the options to their actual semantics for batch reads, and prevents silent wrong results for streaming reads. ### Does this PR introduce _any_ user-facing change? Yes, for CDC queries against a `Changelog` connector. <details> <summary> Before/after example (click to expand)</summary> Given a `Changelog` connector that advertises both `containsCarryoverRows = true` and `representsUpdateAsDeleteAndInsert = true`, with rowId `id` and a `rowVersion` column, for versions 1–2: **Raw rows emitted by the connector:** ``` 1 | Alice | insert | 1 2 | Bob | insert | 1 3 | Carol | insert | 1 1 | Alice | delete | 2 -- part of rename Alice -> Alicia 1 | Alicia | insert | 2 -- part of rename Alice -> Alicia 2 | Bob | delete | 2 -- carry-over (CoW, row unchanged) 2 | Bob | insert | 2 -- carry-over (CoW, row unchanged) 3 | Carol | delete | 2 -- real delete ``` **Before this PR** — `WITH (computeUpdates = 'true')` is silently ignored, carry-overs leak through: ``` 1 | Alice | insert | 1 2 | Bob | insert | 1 3 | Carol | insert | 1 1 | Alice | delete | 2 1 | Alicia | insert | 2 2 | Bob | delete | 2 2 | Bob | insert | 2 3 | Carol | delete | 2 ``` **After this PR**, `WITH (computeUpdates = 'true')`: ``` 1 | Alice | insert | 1 2 | Bob | insert | 1 3 | Carol | insert | 1 1 | Alice | update_preimage | 2 1 | Alicia | update_postimage | 2 3 | Carol | delete | 2 ``` </details> ### How was this patch tested? `ResolveChangelogTablePostProcessingSuite` exercises the batch rule end-to-end via SQL against `InMemoryChangelogCatalog` (carry-over removal, update detection, their interaction across the option and connector-flag matrix, data-column handling with mixed types, and plan-shape invariants). `ChangelogResolutionSuite` adds streaming-rejection cases for the two capability flags that would require post-processing. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
