gengliangwang commented on code in PR #55637: URL: https://github.com/apache/spark/pull/55637#discussion_r3175606894
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CdcNetChangesStatefulProcessor.scala: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.SparkException +import org.apache.spark.sql.{Encoder, Row} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.connector.catalog.Changelog +import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.types.StructType + +/** + * StatefulProcessor that incrementalises CDC net-change computation for streaming reads. + * + * The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a Catalyst + * `Window` partitioned by `rowId` and ordered by `(_commit_version, change_type_rank)` to + * extract the first and last events per row identity, then applies the SPIP collapse + * matrix on `(existedBefore, existsAfter)`. That `Window` is rejected on streaming + * queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`). + * + * This processor reproduces the same semantics with `transformWithState`. Per-row-identity + * state stores the first event ever observed and the most-recent event observed; an event + * time timer keyed on `_commit_timestamp` advances with each batch and fires once the + * global watermark passes the latest event time observed for the key, at which point the + * SPIP matrix is evaluated and the net result is emitted. + * + * Output schema: identical to the connector's changelog schema. + * + * Documented limitation: row identities only touched in the latest observed commit do not + * emit until a later commit (with strictly greater `_commit_timestamp`) advances the + * watermark past them, or the source terminates. End-of-stream flushes all pending + * timers, so bounded streams produce the same output as the corresponding batch read. + * + * @param inputSchema schema of the rows fed into this processor; the connector's + * changelog schema (data columns + `_change_type` + + * `_commit_version` + `_commit_timestamp`) optionally extended with + * rowId helper columns added by + * [[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]]. + * @param computeUpdates whether `(existedBefore, existsAfter) = (true, true)` should be + * relabeled as `update_preimage` / `update_postimage` (true) or kept + * as `delete` / `insert` (false), matching the batch contract. + */ +private[analysis] class CdcNetChangesStatefulProcessor( + inputSchema: StructType, + computeUpdates: Boolean) + extends StatefulProcessor[Row, Row, Row] { + + @transient private var firstEvent: ValueState[Row] = _ + @transient private var lastEvent: ValueState[Row] = _ + + // Hoisted out of `relabel` so we don't pay a linear `fieldIndex` scan per emitted row. + private val changeTypeIdx: Int = inputSchema.fieldIndex("_change_type") + + override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = { + val handle = getHandle + val rowEncoder: Encoder[Row] = ExpressionEncoder(inputSchema) + firstEvent = handle.getValueState[Row]("firstEvent", rowEncoder, TTLConfig.NONE) + lastEvent = handle.getValueState[Row]("lastEvent", rowEncoder, TTLConfig.NONE) + } + + override def handleInputRows( + key: Row, + inputRows: Iterator[Row], + timerValues: TimerValues): Iterator[Row] = { + val handle = getHandle + val sorted = inputRows.toSeq.sortBy { row => + val v = row.getAs[Long]("_commit_version") Review Comment: Correction on my previous reply: rather than restricting `_commit_version` to `LongType`, c6ccf90 generalizes both paths so the column accepts any atomic orderable type (`LongType`, `StringType`, `IntegerType`, `TimestampType`, ...) -- only complex types (`ArrayType`, `MapType`, `StructType`) are rejected by `ChangelogTable.validateSchema`. `CdcNetChangesStatefulProcessor` now uses a generic `Ordering[Row]` that compares `_commit_version` through its boxed Java `Comparable` (works uniformly for every atomic type), composed with the existing change-type rank tiebreaker. The batch path was already type-agnostic via Catalyst's `SortOrder` on the same attribute, so both paths now agree. The `Changelog` Javadoc was updated to spell out this contract ("atomic orderable type, e.g. LongType, StringType, IntegerType, TimestampType"), and `ChangelogResolutionSuite` was rewritten to assert (a) atomic types pass and (b) complex types fail with `INVALID_COLUMN_TYPE`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
