SanJSp commented on code in PR #55507:
URL: https://github.com/apache/spark/pull/55507#discussion_r3137751813


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {
+    val byName = cl.columns.map(c => c.name -> c).toMap
+    def check(name: String, expected: DataType*): Unit = {
+      val col = byName.getOrElse(name,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
name))
+      if (expected.nonEmpty && col.dataType != expected.head) {
+        throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+          cl.name, name, expected.head.sql, col.dataType.sql)
+      }
+    }
+    check("_change_type", StringType)
+    check("_commit_version")           // connector-defined, any type accepted
+    check("_commit_timestamp", TimestampType)
+
+    // `rowId()` / `rowVersion()` default to throwing 
UnsupportedOperationException for
+    // connectors that haven't opted in. Translate that into "not declared" so 
we can
+    // reason about it as Option/empty-array below.
+    val rowIds: Array[NamedReference] = try cl.rowId() catch {
+      case _: UnsupportedOperationException => Array.empty
+    }
+    val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) 
catch {
+      case _: UnsupportedOperationException => None
+    }
+
+    // Capability-driven presence checks: a connector that advertises a 
capability which
+    // requires row identity or row versioning must actually expose those 
references.
+    // Otherwise post-processing would crash with an 
UnsupportedOperationException at
+    // runtime instead of producing a clean AnalysisException here.
+    val needsRowId = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert() ||
+      cl.containsIntermediateChanges()
+    if (needsRowId && (rowIds == null || rowIds.isEmpty)) {
+      throw QueryCompilationErrors.changelogMissingRowIdError(cl.name)
+    }

Review Comment:
   Done, using your option 2 from the [NULL-safety thread on 
#55426](https://github.com/apache/spark/pull/55426#discussion_r3120231839). 
Added `count(rowVersion)` to the carry-over Window as a third aggregate 
alongside `min` and `max` (no extra Window operator, no additional shuffle). 
The filter now requires `_rv_cnt = 2 AND _min_rv = _max_rv`. A NULL rowVersion 
on either side fails the count check and the pair falls through as raw 
delete+insert instead of being silently dropped. Nesting-agnostic. 
Implementation and regression test (`"NULL rowVersion on one side is NOT 
silently dropped as carry-over"`) in #55508. 
   
   On the rowId asymmetry: rowId nullability is not schema-checked. An 
analogous silent-drop path exists (multiple NULL-rowId rows collapse into one 
Window partition via SQL NULL-group semantics), but the trigger surface is 
narrower than for rowVersion and a `count()=2`-style runtime guard does not 
port cleanly. 
   
   A top-level-only schema check would cover `id` but miss, for example, 
Delta's nested `_metadata.row_id`. This asymmetric coverage feels worse than no 
coverage at all. 
   We can 
   - either do a full schema walk through metadata columns covering both 
top-level and nested (how deep do we go, I think all the way, right?),
   -  or leave it unenforced and trust the Javadoc contract. 
   
   Currently, it's implemented for the latter. Open to the recursive column 
check but could need some input here. What do you think @gengliangwang?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to