gengliangwang commented on code in PR #55507:
URL: https://github.com/apache/spark/pull/55507#discussion_r3132688322


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {
+    val byName = cl.columns.map(c => c.name -> c).toMap

Review Comment:
   Duplicate column names in the connector schema are silently dropped (last 
write wins in `toMap`). A connector with a bug that emits two `_change_type` 
columns wouldn't trip the validator — it would surface later as an 
attribute-resolution ambiguity. Worth rejecting duplicates here. (Not a 
blocker.)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {

Review Comment:
   Consider scoping to `private[v2]`: `ChangelogTable` is documented as NOT a 
connector API surface, and `validateSchema` is only called from the primary 
constructor. Keeping it `public` invites external callers to bypass or 
double-invoke the validation.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {
+    val byName = cl.columns.map(c => c.name -> c).toMap
+    def check(name: String, expected: DataType*): Unit = {
+      val col = byName.getOrElse(name,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
name))
+      if (expected.nonEmpty && col.dataType != expected.head) {
+        throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+          cl.name, name, expected.head.sql, col.dataType.sql)
+      }
+    }
+    check("_change_type", StringType)
+    check("_commit_version")           // connector-defined, any type accepted
+    check("_commit_timestamp", TimestampType)
+
+    // `rowId()` / `rowVersion()` default to throwing 
UnsupportedOperationException for
+    // connectors that haven't opted in. Translate that into "not declared" so 
we can
+    // reason about it as Option/empty-array below.
+    val rowIds: Array[NamedReference] = try cl.rowId() catch {
+      case _: UnsupportedOperationException => Array.empty
+    }
+    val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) 
catch {
+      case _: UnsupportedOperationException => None
+    }
+
+    // Capability-driven presence checks: a connector that advertises a 
capability which
+    // requires row identity or row versioning must actually expose those 
references.
+    // Otherwise post-processing would crash with an 
UnsupportedOperationException at
+    // runtime instead of producing a clean AnalysisException here.
+    val needsRowId = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert() ||
+      cl.containsIntermediateChanges()
+    if (needsRowId && (rowIds == null || rowIds.isEmpty)) {
+      throw QueryCompilationErrors.changelogMissingRowIdError(cl.name)
+    }
+
+    val needsRowVersion = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert()
+    if (needsRowVersion && rowVersionRef.isEmpty) {
+      throw QueryCompilationErrors.changelogMissingRowVersionError(cl.name)
+    }
+
+    // Schema constraints on rowVersion: must be a top-level non-nullable 
column.
+    // Nullable rowVersions break carry-over detection (NULL = NULL is 
unknown, so a
+    // delete+insert pair would be misclassified as a real update).
+    rowVersionRef.foreach { ref =>
+      val fieldNames = ref.fieldNames()
+      if (fieldNames.length != 1) {
+        throw QueryCompilationErrors.changelogNestedRowVersionError(
+          cl.name, fieldNames.mkString("."))
+      }
+      val columnName = fieldNames(0)
+      val col = byName.getOrElse(columnName,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
columnName))

Review Comment:
   When `rowVersion()` points to a column that isn't in `columns()`, we throw 
`MISSING_COLUMN` with the referenced column name — the user sees "Required 
column `<x>` is missing", as if a required metadata column were missing. The 
real cause is "`rowVersion` references a column not in the schema." Consider a 
dedicated sub-class (e.g. `ROW_VERSION_COLUMN_NOT_FOUND`) or at least a 
reworded message. This path also has no test.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {
+    val byName = cl.columns.map(c => c.name -> c).toMap
+    def check(name: String, expected: DataType*): Unit = {
+      val col = byName.getOrElse(name,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
name))
+      if (expected.nonEmpty && col.dataType != expected.head) {
+        throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+          cl.name, name, expected.head.sql, col.dataType.sql)
+      }
+    }
+    check("_change_type", StringType)
+    check("_commit_version")           // connector-defined, any type accepted
+    check("_commit_timestamp", TimestampType)
+
+    // `rowId()` / `rowVersion()` default to throwing 
UnsupportedOperationException for
+    // connectors that haven't opted in. Translate that into "not declared" so 
we can
+    // reason about it as Option/empty-array below.
+    val rowIds: Array[NamedReference] = try cl.rowId() catch {
+      case _: UnsupportedOperationException => Array.empty
+    }
+    val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) 
catch {
+      case _: UnsupportedOperationException => None
+    }
+
+    // Capability-driven presence checks: a connector that advertises a 
capability which
+    // requires row identity or row versioning must actually expose those 
references.
+    // Otherwise post-processing would crash with an 
UnsupportedOperationException at
+    // runtime instead of producing a clean AnalysisException here.
+    val needsRowId = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert() ||
+      cl.containsIntermediateChanges()
+    if (needsRowId && (rowIds == null || rowIds.isEmpty)) {
+      throw QueryCompilationErrors.changelogMissingRowIdError(cl.name)
+    }

Review Comment:
   `rowId` columns are not checked for non-nullability, even though (a) the 
`Changelog.rowId()` Javadoc requires "Each referenced column must be 
non-nullable", and (b) the peer row-level-operations path validates this via 
`RewriteRowLevelCommand.resolveRowIdAttrs` with `NULLABLE_ROW_ID_ATTRIBUTES`. 
Consider adding a parallel `NULLABLE_ROW_ID` sub-class (or at least stating 
explicitly that `rowId` column validation is deferred to a later PR). As 
written, `rowVersion` gets nullability + top-level-ness but `rowId` gets 
presence only.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {
+    val byName = cl.columns.map(c => c.name -> c).toMap
+    def check(name: String, expected: DataType*): Unit = {
+      val col = byName.getOrElse(name,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
name))
+      if (expected.nonEmpty && col.dataType != expected.head) {
+        throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+          cl.name, name, expected.head.sql, col.dataType.sql)
+      }
+    }
+    check("_change_type", StringType)
+    check("_commit_version")           // connector-defined, any type accepted
+    check("_commit_timestamp", TimestampType)
+
+    // `rowId()` / `rowVersion()` default to throwing 
UnsupportedOperationException for
+    // connectors that haven't opted in. Translate that into "not declared" so 
we can
+    // reason about it as Option/empty-array below.
+    val rowIds: Array[NamedReference] = try cl.rowId() catch {
+      case _: UnsupportedOperationException => Array.empty
+    }
+    val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) 
catch {
+      case _: UnsupportedOperationException => None
+    }
+
+    // Capability-driven presence checks: a connector that advertises a 
capability which
+    // requires row identity or row versioning must actually expose those 
references.
+    // Otherwise post-processing would crash with an 
UnsupportedOperationException at
+    // runtime instead of producing a clean AnalysisException here.
+    val needsRowId = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert() ||
+      cl.containsIntermediateChanges()
+    if (needsRowId && (rowIds == null || rowIds.isEmpty)) {
+      throw QueryCompilationErrors.changelogMissingRowIdError(cl.name)
+    }
+
+    val needsRowVersion = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert()
+    if (needsRowVersion && rowVersionRef.isEmpty) {
+      throw QueryCompilationErrors.changelogMissingRowVersionError(cl.name)
+    }
+
+    // Schema constraints on rowVersion: must be a top-level non-nullable 
column.
+    // Nullable rowVersions break carry-over detection (NULL = NULL is 
unknown, so a
+    // delete+insert pair would be misclassified as a real update).
+    rowVersionRef.foreach { ref =>
+      val fieldNames = ref.fieldNames()
+      if (fieldNames.length != 1) {

Review Comment:
   The top-level requirement is new — the `Changelog.rowVersion()` Javadoc only 
says "non-nullable". A connector that reads the contract and returns a nested 
`NamedReference` will fail with `NESTED_ROW_VERSION` but get no hint from the 
API docs. Please either (a) update the `Changelog.rowVersion()` Javadoc to 
state that the reference must be a top-level column of `columns()`, or (b) 
remove this check and accept nested references. Same applies (by extension) to 
`rowId` if top-level-ness is also intended there.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -35,6 +38,11 @@ case class ChangelogTable(
     changelog: Changelog,
     changelogInfo: ChangelogInfo) extends Table with SupportsRead {
 
+  // Validate the connector returned a schema with the required CDC metadata 
columns

Review Comment:
   Minor grammar — "Validate **that** the connector …".
   
   ```suggestion
     // Validate that the connector returned a schema with the required CDC 
metadata columns
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala:
##########
@@ -203,4 +206,159 @@ class ChangelogResolutionSuite extends QueryTest with 
SharedSparkSession {
     assert(range.startingVersion() == "1")
     assert(range.endingVersion().get() == "5")
   }
+
+  // 
===========================================================================
+  // Generic changelog schema validation
+  // 
===========================================================================
+
+  private def stubInfo(): ChangelogInfo = new ChangelogInfo(
+    new ChangelogRange.VersionRange("1", java.util.Optional.of("2"), true, 
true),
+    ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS,
+    false)
+
+  private def cl(name: String, cols: (String, 
org.apache.spark.sql.types.DataType)*)
+      : TestChangelog = {
+    new TestChangelog(name, cols.map { case (n, t) => Column.create(n, t) 
}.toArray)
+  }
+
+  private def missing(columnName: String): Map[String, String] =
+    Map("changelogName" -> "bad_cl", "columnName" -> columnName)
+
+  private def wrongType(columnName: String, expected: String, actual: String)
+      : Map[String, String] = Map(
+    "changelogName" -> "bad_cl",
+    "columnName" -> columnName,
+    "expectedType" -> expected,
+    "actualType" -> actual)
+
+  // Valid metadata tuples; tests swap one of these out to create broken 
schemas.
+  private val validChangeType = "_change_type" -> StringType
+  private val validVersion = "_commit_version" -> LongType
+  private val validTimestamp = "_commit_timestamp" -> TimestampType
+
+  test("ChangelogTable - missing _change_type column throws") {
+    checkError(
+      intercept[AnalysisException] {
+        ChangelogTable(cl("bad_cl", validVersion, validTimestamp), stubInfo())
+      },
+      condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN",
+      parameters = missing("_change_type"))
+  }
+
+  test("ChangelogTable - missing _commit_version column throws") {
+    checkError(
+      intercept[AnalysisException] {
+        ChangelogTable(cl("bad_cl", validChangeType, validTimestamp), 
stubInfo())
+      },
+      condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN",
+      parameters = missing("_commit_version"))
+  }
+
+  test("ChangelogTable - missing _commit_timestamp column throws") {
+    checkError(
+      intercept[AnalysisException] {
+        ChangelogTable(cl("bad_cl", validChangeType, validVersion), stubInfo())
+      },
+      condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN",
+      parameters = missing("_commit_timestamp"))
+  }
+
+  test("ChangelogTable - wrong _change_type data type throws") {
+    checkError(
+      intercept[AnalysisException] {
+        ChangelogTable(
+          cl("bad_cl", "_change_type" -> IntegerType, validVersion, 
validTimestamp),
+          stubInfo())
+      },
+      condition = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE",
+      parameters = wrongType("_change_type", "STRING", "INT"))
+  }
+
+  test("ChangelogTable - wrong _commit_timestamp data type throws") {
+    checkError(
+      intercept[AnalysisException] {
+        ChangelogTable(
+          cl("bad_cl", validChangeType, validVersion, "_commit_timestamp" -> 
LongType),
+          stubInfo())
+      },
+      condition = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE",
+      parameters = wrongType("_commit_timestamp", "TIMESTAMP", "BIGINT"))
+  }
+
+  test("ChangelogTable - _commit_version type is connector-defined (any type 
accepted)") {
+    Seq(IntegerType, LongType, StringType).foreach { versionType =>
+      ChangelogTable(
+        cl("any_cl", validChangeType, "_commit_version" -> versionType, 
validTimestamp),
+        stubInfo())
+    }
+  }
+
+  test("ChangelogTable - valid schema with data columns passes") {
+    ChangelogTable(
+      cl("good_cl", "id" -> LongType, "name" -> StringType,
+        validChangeType, validVersion, validTimestamp),
+      stubInfo())
+  }
+
+  test("ChangelogTable - nullable rowVersion column fails") {
+    val cl = new TestChangelog(
+      "bad_cl",
+      Array(
+        Column.create("id", LongType, false),
+        Column.create("_change_type", StringType),
+        Column.create("_commit_version", LongType),
+        Column.create("_commit_timestamp", TimestampType),
+        Column.create("row_commit_version", LongType)),
+      carryoverRows = true,
+      rowIdRefs = Array(FieldReference.column("id")),
+      rowVersionRef = Some(FieldReference.column("row_commit_version")))
+    checkError(
+      intercept[AnalysisException] { ChangelogTable(cl, stubInfo()) },
+      condition = "INVALID_CHANGELOG_SCHEMA.NULLABLE_ROW_VERSION",
+      parameters = Map(
+        "changelogName" -> "bad_cl",
+        "columnName" -> "row_commit_version"))
+  }
+
+  test("ChangelogTable - non-nullable rowVersion column passes") {
+    val cl = new TestChangelog(
+      "good_cl",
+      Array(
+        Column.create("id", LongType, false),
+        Column.create("_change_type", StringType),
+        Column.create("_commit_version", LongType),
+        Column.create("_commit_timestamp", TimestampType),
+        Column.create("row_commit_version", LongType, false)),
+      carryoverRows = true,
+      rowIdRefs = Array(FieldReference.column("id")),
+      rowVersionRef = Some(FieldReference.column("row_commit_version")))
+    ChangelogTable(cl, stubInfo())
+  }

Review Comment:
   Three sub-classes introduced in this PR have no test case: `MISSING_ROW_ID`, 
`MISSING_ROW_VERSION`, `NESTED_ROW_VERSION`. Capability triggers other than 
`containsCarryoverRows=true` are also untested (no case exercising 
`representsUpdateAsDeleteAndInsert=true` or `containsIntermediateChanges=true`, 
and no case with a `rowVersion` referencing a column that is absent from 
`columns()`). The `TestChangelog` fixture already supports these — a few small 
cases would close the coverage gap the PR description claims is already covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to