gengliangwang commented on code in PR #55637:
URL: https://github.com/apache/spark/pull/55637#discussion_r3175594715


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CdcNetChangesStatefulProcessor.scala:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{Encoder, Row}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.connector.catalog.Changelog
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * StatefulProcessor that incrementalises CDC net-change computation for 
streaming reads.
+ *
+ * The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a 
Catalyst
+ * `Window` partitioned by `rowId` and ordered by `(_commit_version, 
change_type_rank)` to
+ * extract the first and last events per row identity, then applies the SPIP 
collapse
+ * matrix on `(existedBefore, existsAfter)`. That `Window` is rejected on 
streaming
+ * queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`).
+ *
+ * This processor reproduces the same semantics with `transformWithState`. 
Per-row-identity
+ * state stores the first event ever observed and the most-recent event 
observed; an event
+ * time timer keyed on `_commit_timestamp` advances with each batch and fires 
once the
+ * global watermark passes the latest event time observed for the key, at 
which point the
+ * SPIP matrix is evaluated and the net result is emitted.
+ *
+ * Output schema: identical to the connector's changelog schema.
+ *
+ * Documented limitation: row identities only touched in the latest observed 
commit do not
+ * emit until a later commit (with strictly greater `_commit_timestamp`) 
advances the
+ * watermark past them, or the source terminates. End-of-stream flushes all 
pending
+ * timers, so bounded streams produce the same output as the corresponding 
batch read.
+ *
+ * @param inputSchema    schema of the rows fed into this processor; the 
connector's
+ *                       changelog schema (data columns + `_change_type` +
+ *                       `_commit_version` + `_commit_timestamp`) optionally 
extended with
+ *                       rowId helper columns added by
+ *                       
[[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]].
+ * @param computeUpdates whether `(existedBefore, existsAfter) = (true, true)` 
should be
+ *                       relabeled as `update_preimage` / `update_postimage` 
(true) or kept
+ *                       as `delete` / `insert` (false), matching the batch 
contract.
+ */
+private[analysis] class CdcNetChangesStatefulProcessor(
+    inputSchema: StructType,
+    computeUpdates: Boolean)
+  extends StatefulProcessor[Row, Row, Row] {
+
+  @transient private var firstEvent: ValueState[Row] = _
+  @transient private var lastEvent: ValueState[Row] = _
+
+  // Hoisted out of `relabel` so we don't pay a linear `fieldIndex` scan per 
emitted row.
+  private val changeTypeIdx: Int = inputSchema.fieldIndex("_change_type")
+
+  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
+    val handle = getHandle
+    val rowEncoder: Encoder[Row] = ExpressionEncoder(inputSchema)
+    firstEvent = handle.getValueState[Row]("firstEvent", rowEncoder, 
TTLConfig.NONE)
+    lastEvent = handle.getValueState[Row]("lastEvent", rowEncoder, 
TTLConfig.NONE)
+  }
+
+  override def handleInputRows(
+      key: Row,
+      inputRows: Iterator[Row],
+      timerValues: TimerValues): Iterator[Row] = {
+    val handle = getHandle
+    val sorted = inputRows.toSeq.sortBy { row =>
+      val v = row.getAs[Long]("_commit_version")
+      val ct = row.getAs[String]("_change_type")
+      val rank = ct match {
+        case Changelog.CHANGE_TYPE_UPDATE_PREIMAGE | 
Changelog.CHANGE_TYPE_DELETE => 0
+        case Changelog.CHANGE_TYPE_INSERT | 
Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE => 1
+        case _ => throw new SparkException(
+          errorClass = "CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_CHANGE_TYPE",
+          messageParameters = Map.empty,
+          cause = null)
+      }
+      (v, rank)
+    }
+    if (sorted.isEmpty) return Iterator.empty
+
+    if (!firstEvent.exists()) {
+      firstEvent.update(sorted.head)
+    }
+    lastEvent.update(sorted.last)
+
+    // Re-arm the per-key event-time timer to the latest observed 
`_commit_timestamp`.
+    // Without dropping any existing timers we'd risk an earlier timer firing 
first and
+    // emitting state that later events would then re-populate, producing 
duplicate
+    // output for the same row identity.
+    //
+    // A NULL `_commit_timestamp` cannot be turned into a timer epoch and 
would NPE on
+    // `getTime()`. The `Changelog` Javadoc requires non-NULL 
`_commit_timestamp` on
+    // streaming reads engaging post-processing, so we surface the contract 
violation
+    // with a clear error class rather than failing the micro-batch with an 
opaque NPE.
+    val ts = sorted.last.getAs[java.sql.Timestamp]("_commit_timestamp")
+    if (ts == null) {
+      throw new SparkException(
+        errorClass = "CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP",
+        messageParameters = Map.empty,
+        cause = null)
+    }
+    val newTimerMs = ts.getTime
+    val existing = handle.listTimers().toList
+    existing.foreach(handle.deleteTimer)
+    handle.registerTimer(newTimerMs)
+
+    Iterator.empty
+  }
+
+  override def handleExpiredTimer(

Review Comment:
   Updated the docs to be honest about this. The streaming netChanges path is 
intentionally incremental (watermark-window-scoped) and cannot match batch 
range-scoped netChanges in the general case -- once a row has been emitted 
downstream we cannot retract it. Bringing the streaming output into true parity 
with batch range-scoped netChanges would require holding per-rowId state for 
the entire query lifetime (no eviction), which is unbounded state and not 
viable for a long-running stream.
   
   Removed the inaccurate "bounded streams produce the same output as the 
corresponding batch read" claim from `CdcNetChangesStatefulProcessor`, 
`ResolveChangelogTable.addStreamingNetChangeComputation`, and the public 
`DataStreamReader.changes` Scaladoc, and added the v1/v2/v3 example showing 
where the two collapses diverge.
   
   Will follow up separately with a multi-batch test once 
`InMemoryChangelogMicroBatchStream` supports adding rows after `start()` -- 
today the scan captures the row list at `toMicroBatchStream` time, so all rows 
arrive in a single micro-batch.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CdcNetChangesStatefulProcessor.scala:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{Encoder, Row}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.connector.catalog.Changelog
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * StatefulProcessor that incrementalises CDC net-change computation for 
streaming reads.
+ *
+ * The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a 
Catalyst
+ * `Window` partitioned by `rowId` and ordered by `(_commit_version, 
change_type_rank)` to
+ * extract the first and last events per row identity, then applies the SPIP 
collapse
+ * matrix on `(existedBefore, existsAfter)`. That `Window` is rejected on 
streaming
+ * queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`).
+ *
+ * This processor reproduces the same semantics with `transformWithState`. 
Per-row-identity
+ * state stores the first event ever observed and the most-recent event 
observed; an event
+ * time timer keyed on `_commit_timestamp` advances with each batch and fires 
once the
+ * global watermark passes the latest event time observed for the key, at 
which point the
+ * SPIP matrix is evaluated and the net result is emitted.
+ *
+ * Output schema: identical to the connector's changelog schema.
+ *
+ * Documented limitation: row identities only touched in the latest observed 
commit do not
+ * emit until a later commit (with strictly greater `_commit_timestamp`) 
advances the
+ * watermark past them, or the source terminates. End-of-stream flushes all 
pending
+ * timers, so bounded streams produce the same output as the corresponding 
batch read.
+ *
+ * @param inputSchema    schema of the rows fed into this processor; the 
connector's
+ *                       changelog schema (data columns + `_change_type` +
+ *                       `_commit_version` + `_commit_timestamp`) optionally 
extended with
+ *                       rowId helper columns added by
+ *                       
[[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]].
+ * @param computeUpdates whether `(existedBefore, existsAfter) = (true, true)` 
should be
+ *                       relabeled as `update_preimage` / `update_postimage` 
(true) or kept
+ *                       as `delete` / `insert` (false), matching the batch 
contract.
+ */
+private[analysis] class CdcNetChangesStatefulProcessor(
+    inputSchema: StructType,
+    computeUpdates: Boolean)
+  extends StatefulProcessor[Row, Row, Row] {
+
+  @transient private var firstEvent: ValueState[Row] = _
+  @transient private var lastEvent: ValueState[Row] = _
+
+  // Hoisted out of `relabel` so we don't pay a linear `fieldIndex` scan per 
emitted row.
+  private val changeTypeIdx: Int = inputSchema.fieldIndex("_change_type")
+
+  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
+    val handle = getHandle
+    val rowEncoder: Encoder[Row] = ExpressionEncoder(inputSchema)
+    firstEvent = handle.getValueState[Row]("firstEvent", rowEncoder, 
TTLConfig.NONE)
+    lastEvent = handle.getValueState[Row]("lastEvent", rowEncoder, 
TTLConfig.NONE)
+  }
+
+  override def handleInputRows(
+      key: Row,
+      inputRows: Iterator[Row],
+      timerValues: TimerValues): Iterator[Row] = {
+    val handle = getHandle
+    val sorted = inputRows.toSeq.sortBy { row =>
+      val v = row.getAs[Long]("_commit_version")

Review Comment:
   Tightened the contract instead of generalizing the cast: the `Changelog` 
Javadoc already said `_commit_version` is LONG, but 
`ChangelogTable.validateSchema` was permissive. Now `_commit_version` must be 
`LongType` -- enforced in `validateSchema` so any other declared type fails 
analysis with `INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE` rather than 
failing only the streaming path with a runtime ClassCastException. 
`ChangelogResolutionSuite` updated to assert `IntegerType` / `StringType` are 
now rejected.
   
   This keeps the batch and streaming paths consistent (batch's generic 
Catalyst sort and the streaming `getAs[Long]` agree on the same input type) and 
matches the existing comment "Spark post-processing compares versions as 
primitive longs" in the Javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to