gengliangwang commented on code in PR #55507:
URL: https://github.com/apache/spark/pull/55507#discussion_r3140648242


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {
+    val byName = cl.columns.map(c => c.name -> c).toMap
+    def check(name: String, expected: DataType*): Unit = {
+      val col = byName.getOrElse(name,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
name))
+      if (expected.nonEmpty && col.dataType != expected.head) {
+        throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+          cl.name, name, expected.head.sql, col.dataType.sql)
+      }
+    }
+    check("_change_type", StringType)
+    check("_commit_version")           // connector-defined, any type accepted
+    check("_commit_timestamp", TimestampType)
+
+    // `rowId()` / `rowVersion()` default to throwing 
UnsupportedOperationException for
+    // connectors that haven't opted in. Translate that into "not declared" so 
we can
+    // reason about it as Option/empty-array below.
+    val rowIds: Array[NamedReference] = try cl.rowId() catch {
+      case _: UnsupportedOperationException => Array.empty
+    }
+    val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) 
catch {
+      case _: UnsupportedOperationException => None
+    }
+
+    // Capability-driven presence checks: a connector that advertises a 
capability which
+    // requires row identity or row versioning must actually expose those 
references.
+    // Otherwise post-processing would crash with an 
UnsupportedOperationException at
+    // runtime instead of producing a clean AnalysisException here.
+    val needsRowId = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert() ||
+      cl.containsIntermediateChanges()
+    if (needsRowId && (rowIds == null || rowIds.isEmpty)) {
+      throw QueryCompilationErrors.changelogMissingRowIdError(cl.name)
+    }
+
+    val needsRowVersion = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert()
+    if (needsRowVersion && rowVersionRef.isEmpty) {
+      throw QueryCompilationErrors.changelogMissingRowVersionError(cl.name)
+    }
+
+    // Schema constraints on rowVersion: must be a top-level non-nullable 
column.
+    // Nullable rowVersions break carry-over detection (NULL = NULL is 
unknown, so a
+    // delete+insert pair would be misclassified as a real update).
+    rowVersionRef.foreach { ref =>
+      val fieldNames = ref.fieldNames()
+      if (fieldNames.length != 1) {

Review Comment:
   There is no new commit since the review yesterday.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to