SanJSp commented on code in PR #55507:
URL: https://github.com/apache/spark/pull/55507#discussion_r3137935393


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {
+    val byName = cl.columns.map(c => c.name -> c).toMap
+    def check(name: String, expected: DataType*): Unit = {
+      val col = byName.getOrElse(name,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
name))
+      if (expected.nonEmpty && col.dataType != expected.head) {
+        throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+          cl.name, name, expected.head.sql, col.dataType.sql)
+      }
+    }
+    check("_change_type", StringType)
+    check("_commit_version")           // connector-defined, any type accepted
+    check("_commit_timestamp", TimestampType)
+
+    // `rowId()` / `rowVersion()` default to throwing 
UnsupportedOperationException for
+    // connectors that haven't opted in. Translate that into "not declared" so 
we can
+    // reason about it as Option/empty-array below.
+    val rowIds: Array[NamedReference] = try cl.rowId() catch {
+      case _: UnsupportedOperationException => Array.empty
+    }
+    val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) 
catch {
+      case _: UnsupportedOperationException => None
+    }
+
+    // Capability-driven presence checks: a connector that advertises a 
capability which
+    // requires row identity or row versioning must actually expose those 
references.
+    // Otherwise post-processing would crash with an 
UnsupportedOperationException at
+    // runtime instead of producing a clean AnalysisException here.
+    val needsRowId = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert() ||
+      cl.containsIntermediateChanges()
+    if (needsRowId && (rowIds == null || rowIds.isEmpty)) {
+      throw QueryCompilationErrors.changelogMissingRowIdError(cl.name)
+    }

Review Comment:
   For now I'd defer to a later PR. Is that fine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to