SanJSp commented on code in PR #55507:
URL: https://github.com/apache/spark/pull/55507#discussion_r3137751813
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ,
MICRO_BATCH_READ)
}
+
+object ChangelogTable {
+
+ def validateSchema(cl: Changelog): Unit = {
+ val byName = cl.columns.map(c => c.name -> c).toMap
+ def check(name: String, expected: DataType*): Unit = {
+ val col = byName.getOrElse(name,
+ throw QueryCompilationErrors.changelogMissingColumnError(cl.name,
name))
+ if (expected.nonEmpty && col.dataType != expected.head) {
+ throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+ cl.name, name, expected.head.sql, col.dataType.sql)
+ }
+ }
+ check("_change_type", StringType)
+ check("_commit_version") // connector-defined, any type accepted
+ check("_commit_timestamp", TimestampType)
+
+ // `rowId()` / `rowVersion()` default to throwing
UnsupportedOperationException for
+ // connectors that haven't opted in. Translate that into "not declared" so
we can
+ // reason about it as Option/empty-array below.
+ val rowIds: Array[NamedReference] = try cl.rowId() catch {
+ case _: UnsupportedOperationException => Array.empty
+ }
+ val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion())
catch {
+ case _: UnsupportedOperationException => None
+ }
+
+ // Capability-driven presence checks: a connector that advertises a
capability which
+ // requires row identity or row versioning must actually expose those
references.
+ // Otherwise post-processing would crash with an
UnsupportedOperationException at
+ // runtime instead of producing a clean AnalysisException here.
+ val needsRowId = cl.containsCarryoverRows() ||
+ cl.representsUpdateAsDeleteAndInsert() ||
+ cl.containsIntermediateChanges()
+ if (needsRowId && (rowIds == null || rowIds.isEmpty)) {
+ throw QueryCompilationErrors.changelogMissingRowIdError(cl.name)
+ }
Review Comment:
Done, using your option 2 from the [NULL-safety thread on
#55426](https://github.com/apache/spark/pull/55426#discussion_r3120231839).
Added `count(rowVersion)` to the carry-over Window as a third aggregate
alongside `min` and `max` (no extra Window operator, no additional shuffle).
The filter now requires `_rv_cnt = 2 AND _min_rv = _max_rv`. A NULL rowVersion
on either side fails the count check and the pair falls through as raw
delete+insert instead of being silently dropped. Nesting-agnostic.
Implementation and regression test (`"NULL rowVersion on one side is NOT
silently dropped as carry-over"`) in #55508.
On the rowId asymmetry: rowId nullability is not schema-checked. An
analogous silent-drop path exists (multiple NULL-rowId rows collapse into one
Window partition via SQL NULL-group semantics), but the trigger surface is
narrower than for rowVersion and a `count()=2`-style runtime guard does not
port cleanly.
A top-level-only schema check would cover `id` but miss, for example,
Delta's nested `_metadata.row_id`. This asymmetric coverage feels worse than no
coverage at all.
We can
- either do a full schema walk through metadata columns covering both
top-level and nested (how deep do we go, I think all the way, right?),
- or leave it unenforced and trust the Javadoc contract.
Currently, it's implemented for the latter. Open to the recursive column
check but could need some input here. What do you think @gengliangwang?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]