gengliangwang opened a new pull request, #55637:
URL: https://github.com/apache/spark/pull/55637
### What changes were proposed in this pull request?
This PR completes the DSv2 CDC streaming post-processing surface by
implementing `deduplicationMode = netChanges` for streaming reads. The previous
PR (#55636 / SPARK-56686) added carry-over removal and update detection for
streaming but left netChanges batch-only.
The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a
Catalyst `Window` partitioned by `rowId` and ordered by `(_commit_version,
change_type_rank)` to find the first and last events per row identity, then
applies the SPIP collapse matrix on `(existedBefore, existsAfter)`. `Window` is
rejected on streaming children (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`),
and unlike the row-level passes the netChanges aggregate is keyed by `rowId`
only -- there's no commit-version + commit-timestamp grouping that would let us
reuse the streaming Aggregate pattern.
This PR adds a streaming-friendly equivalent by delegating per-row-identity
state management to a new `CdcNetChangesStatefulProcessor` driven by
`TransformWithState`:
- The processor stores the first event ever observed and the most-recent
event observed for each row identity in `ValueState[Row]`.
- An event-time timer is armed on each batch to the latest
`_commit_timestamp` observed for the key. When the global watermark advances
past the timer, `handleExpiredTimer` evaluates the SPIP matrix and emits 0, 1,
or 2 output rows -- identical semantics to the batch path.
- Existing per-key timers are deleted before re-arming so that out-of-order
events for an earlier commit can't fire a stale timer between batches and
produce duplicate output for the same row identity.
The analyzer rule constructs `TransformWithState` directly (no precedent in
catalyst for this; the typed-Dataset DSL is the usual entry point). Encoders
for the input/output `Row` and the rowId tuple are built via
`ExpressionEncoder(StructType)`. Nested rowId paths (e.g. `payload.id`) are
handled by aliasing each rowId expression to a top-level
`__spark_cdc_rowid_<i>` helper column before the `TransformWithState`, then
dropping the helpers in a final `Project` so the user-visible schema matches
the connector's declared changelog schema.
Plan shape:
```
EventTimeWatermark(_commit_timestamp, 0s)
-> Project (alias rowId expressions to flat helper columns)
-> TransformWithState (grouping = rowId helpers, EventTime mode, Append)
-> SerializeFromObject
-> Project (drop rowId helper columns)
```
When carry-over removal / update detection are also requested, the row-level
rewrite is applied first; the netChanges `TransformWithState` then sits on top
of it and the rule reuses the existing `EventTimeWatermark` rather than
stacking another (multi-watermark stacking is rejected unless
`STATEFUL_OPERATOR_ALLOW_MULTIPLE` is set).
#### Documented limitation
Row identities only touched in the latest observed commit are held back
until a later commit (with strictly greater `_commit_timestamp`) advances the
watermark past them, or the source terminates. End-of-input flushes all timers,
so bounded streams produce output equivalent to the corresponding batch read.
This matches the steady-state behavior of the row-level streaming rewrite.
Also removes the now-obsolete error class
`INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED` introduced in
SPARK-56686.
### Why are the changes needed?
Without this PR, `deduplicationMode = netChanges` is unavailable on
streaming CDC reads, forcing users with intermediate-state connectors
(`containsIntermediateChanges = true`) to fall back to batch reads when they
want a deduplicated change feed. With SPARK-56686 already shipping carry-over
removal and update detection for streaming, netChanges was the only
post-processing pass still gated to batch -- this completes the surface.
### Does this PR introduce _any_ user-facing change?
Yes.
- Streaming `spark.readStream.changes(...)` now supports `deduplicationMode
= netChanges`. Previously this threw
`INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED`.
- That error class is removed; the wording in `DataStreamReader.changes()`
and `Changelog.java` Scaladoc has been updated to describe the supported
behavior and the latest-commit limitation.
Note: the netChanges streaming path uses `TransformWithState`, which
requires the RocksDB state store backend
(`spark.sql.streaming.stateStore.providerClass =
...RocksDBStateStoreProvider`). Spark surfaces
`UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS` if the default
HDFS-backed provider is left in place, so this is discoverable.
### How was this patch tested?
89 tests pass across 4 CDC suites (all green):
- `ResolveChangelogTableStreamingPostProcessingSuite` -- two new plan-shape
tests: `netChanges alone injects watermark + TransformWithState` and
`netChanges + carry-over removal share a single watermark` (verifies that the
netChanges `TransformWithState` reuses the row-level rewrite's
`EventTimeWatermark` rather than stacking another).
- `ChangelogResolutionSuite` -- the `netChanges throws` test from
SPARK-56686 is flipped to assert that exactly one `TransformWithState` appears
in the analyzed plan.
- `ResolveChangelogTablePostProcessingSuite` -- the corresponding netChanges
throw test is similarly flipped.
- `ChangelogEndToEndSuite` -- two new end-to-end tests that drive a
streaming query against `InMemoryChangelogCatalog` with the RocksDB state
store: `streaming netChanges collapses INSERT then DELETE to no output`
(confirms the `(false, false)` cancel case and that end-of-input flushes the
latest commit's group) and `streaming netChanges with computeUpdates labels
persisting rows as updates` (confirms the `(false, true)` case relabels
correctly).
Also confirmed `UnsupportedOperationsSuite` (216 tests) still passes.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]