johanl-db commented on code in PR #55637:
URL: https://github.com/apache/spark/pull/55637#discussion_r3207379788


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java:
##########
@@ -33,8 +33,12 @@
  * <ul>
  *   <li>{@code _change_type} (STRING) — the kind of change: {@code insert}, 
{@code delete},
  *       {@code update_preimage}, or {@code update_postimage}</li>
- *   <li>{@code _commit_version} (connector-defined type, e.g. LONG) — the 
version containing
- *       this change</li>
+ *   <li>{@code _commit_version} — the commit version containing this change. 
Must be of
+ *       an atomic orderable type (e.g. {@code LongType}, {@code StringType},

Review Comment:
   I think that may be too strict. The current contract is that the ordering 
for `_commit_timestamp` is connector-defined.
   When user requests `CHANGES FROM VERSION 'abc' TO VERSION 'def', the 
connector must be able to derive 'abc' -> 'def' to a range of versions, but 
Spark may not know how to compare two arbitrary versions.
   
   The previous change (https://github.com/apache/spark/pull/55636) uses 
`_commit_timestamp` for ordering instead of `_commit_version`, we may want to 
do the same here to avoid imposing unecessary constraints on the connector



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CdcNetChangesStatefulProcessor.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{Encoder, Row}
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.catalyst.util.TypeUtils
+import org.apache.spark.sql.connector.catalog.Changelog
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * StatefulProcessor that incrementalises CDC net-change computation for 
streaming reads.
+ *
+ * The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a 
Catalyst
+ * `Window` partitioned by `rowId` and ordered by `(_commit_version, 
change_type_rank)` to
+ * extract the first and last events per row identity, then applies the SPIP 
collapse
+ * matrix on `(existedBefore, existsAfter)`. That `Window` is rejected on 
streaming
+ * queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`).
+ *
+ * This processor reuses the same SPIP collapse matrix with 
`transformWithState`, applied
+ * per watermark window rather than over the full requested version range. 
Per-row-identity
+ * state stores the first event ever observed and the most-recent event 
observed; an event
+ * time timer keyed on `_commit_timestamp` advances with each batch and fires 
once the
+ * global watermark passes the latest event time observed for the key, at 
which point the
+ * SPIP matrix is evaluated and the net result is emitted. See the paragraph 
below for how
+ * the per-window collapse differs from batch netChanges' range-scoped 
collapse.
+ *
+ * Output schema: identical to the connector's changelog schema.
+ *
+ * Streaming netChanges is incremental: per-row-identity state is cleared once 
its current
+ * net result is emitted (timer fire or end-of-stream flush). Subsequent 
commits on the same

Review Comment:
   We do net changes over single commits, I'm wondering how useful that really 
is in practice. I'm assuming most connectors - true at least for Delta and 
Iceberg - won't produce redundant changes within the same commit.
   
   It could be a bit more useful to let the user configure the window size to 
collapse changes (time based or number of versions) at the cost of latency.
   We should consider whether it really makes sense to allow net changes in 
streaming in its current form in 4.2, it's not particularly useful and may 
prevent us from changing its behavior later down the line. I imagine marking it 
as `Experimental` as it currently is is enough to signal that this may indeed 
change in the future
   



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java:
##########
@@ -61,15 +65,16 @@
  *       snapshots) that derive {@code _commit_timestamp} from wall-clock time 
at
  *       commit time naturally satisfy both requirements.
  *       {@code _commit_timestamp} must be non-{@code NULL} on every row of a 
streaming
- *       read engaging post-processing. The row-level rewrite raises
- *       {@code CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP} on any row 
that
- *       violates this; without the guard a NULL group key would never satisfy 
the
- *       watermark eviction predicate and the row would sit in state 
indefinitely</li>
+ *       read engaging post-processing; both the row-level Aggregate path and 
the
+ *       netChanges {@code transformWithState} path raise
+ *       {@code CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP} on a 
violation</li>
  * </ul>
  * <p>
- * Streaming reads support carry-over removal and update detection but not net 
change
- * computation. The latter requires reasoning over the entire requested range 
and is
- * batch-only.
+ * Streaming reads support carry-over removal, update detection, and net change
+ * computation. Net change collapses are kept in the state store keyed by row 
identity;
+ * row identities only touched in the latest observed commit are held back 
until either a
+ * later commit (with strictly greater `_commit_timestamp`) advances the 
global watermark
+ * past them, or the source terminates.

Review Comment:
   Doesn't the contract with connector currently guarantee that changes from a 
commit won't span multiple batches?
   In https://github.com/apache/spark/pull/55636, we're able to produce changes 
without a 1 commit delay, so we should be able to do the same here, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to