SanJSp commented on code in PR #55507:
URL: https://github.com/apache/spark/pull/55507#discussion_r3137360750


##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3282,6 +3282,44 @@
     },
     "sqlState" : "42K03"
   },
+  "INVALID_CHANGELOG_SCHEMA" : {
+    "message" : [
+      "The Change Data Capture (CDC) schema returned by connector 
<changelogName> is invalid."
+    ],
+    "subClass" : {
+      "INVALID_COLUMN_TYPE" : {
+        "message" : [
+          "Column `<columnName>` has type <actualType>, expected 
<expectedType>."
+        ]
+      },
+      "MISSING_COLUMN" : {
+        "message" : [
+          "Required column `<columnName>` is missing."
+        ]
+      },
+      "MISSING_ROW_ID" : {
+        "message" : [
+          "Connector advertises one or more post-processing properties 
(`containsCarryoverRows`, `representsUpdateAsDeleteAndInsert`, 
`containsIntermediateChanges`) that require row identity, but 
`Changelog.rowId()` returned an empty array. Either set all three to `false`, 
or return at least one row-id `NamedReference`."

Review Comment:
   Done.



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3282,6 +3282,44 @@
     },
     "sqlState" : "42K03"
   },
+  "INVALID_CHANGELOG_SCHEMA" : {
+    "message" : [
+      "The Change Data Capture (CDC) schema returned by connector 
<changelogName> is invalid."
+    ],
+    "subClass" : {
+      "INVALID_COLUMN_TYPE" : {
+        "message" : [
+          "Column `<columnName>` has type <actualType>, expected 
<expectedType>."
+        ]
+      },
+      "MISSING_COLUMN" : {
+        "message" : [
+          "Required column `<columnName>` is missing."
+        ]
+      },
+      "MISSING_ROW_ID" : {
+        "message" : [
+          "Connector advertises one or more post-processing properties 
(`containsCarryoverRows`, `representsUpdateAsDeleteAndInsert`, 
`containsIntermediateChanges`) that require row identity, but 
`Changelog.rowId()` returned an empty array. Either set all three to `false`, 
or return at least one row-id `NamedReference`."
+        ]
+      },
+      "MISSING_ROW_VERSION" : {
+        "message" : [
+          "Connector advertises `containsCarryoverRows` or 
`representsUpdateAsDeleteAndInsert` is `true`, but `Changelog.rowVersion()` is 
not implemented. Override `rowVersion()` to return a `NamedReference` pointing 
to a non-nullable column in `Changelog.columns()`."

Review Comment:
   The entry is gone entirely. After surfacing `UnsupportedOperationException` 
directly (per your `ChangelogTable.scala:81` comment), `rowVersion()` has no 
legitimate empty state distinct from UOE, so `MISSING_ROW_VERSION` became 
unreachable.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {
+    val byName = cl.columns.map(c => c.name -> c).toMap
+    def check(name: String, expected: DataType*): Unit = {
+      val col = byName.getOrElse(name,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
name))
+      if (expected.nonEmpty && col.dataType != expected.head) {
+        throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+          cl.name, name, expected.head.sql, col.dataType.sql)
+      }
+    }
+    check("_change_type", StringType)
+    check("_commit_version")           // connector-defined, any type accepted
+    check("_commit_timestamp", TimestampType)
+
+    // `rowId()` / `rowVersion()` default to throwing 
UnsupportedOperationException for
+    // connectors that haven't opted in. Translate that into "not declared" so 
we can
+    // reason about it as Option/empty-array below.
+    val rowIds: Array[NamedReference] = try cl.rowId() catch {
+      case _: UnsupportedOperationException => Array.empty
+    }
+    val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) 
catch {
+      case _: UnsupportedOperationException => None
+    }

Review Comment:
   Done. Side-effect: `MISSING_ROW_VERSION` became unreachable and was removed 
along with its helper and subclass.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -35,6 +38,11 @@ case class ChangelogTable(
     changelog: Changelog,
     changelogInfo: ChangelogInfo) extends Table with SupportsRead {
 
+  // Validate the connector returned a schema with the required CDC metadata 
columns

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {
+    val byName = cl.columns.map(c => c.name -> c).toMap
+    def check(name: String, expected: DataType*): Unit = {
+      val col = byName.getOrElse(name,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
name))
+      if (expected.nonEmpty && col.dataType != expected.head) {
+        throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+          cl.name, name, expected.head.sql, col.dataType.sql)
+      }
+    }
+    check("_change_type", StringType)
+    check("_commit_version")           // connector-defined, any type accepted
+    check("_commit_timestamp", TimestampType)
+
+    // `rowId()` / `rowVersion()` default to throwing 
UnsupportedOperationException for
+    // connectors that haven't opted in. Translate that into "not declared" so 
we can
+    // reason about it as Option/empty-array below.
+    val rowIds: Array[NamedReference] = try cl.rowId() catch {
+      case _: UnsupportedOperationException => Array.empty
+    }
+    val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) 
catch {
+      case _: UnsupportedOperationException => None
+    }
+
+    // Capability-driven presence checks: a connector that advertises a 
capability which
+    // requires row identity or row versioning must actually expose those 
references.
+    // Otherwise post-processing would crash with an 
UnsupportedOperationException at
+    // runtime instead of producing a clean AnalysisException here.
+    val needsRowId = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert() ||
+      cl.containsIntermediateChanges()
+    if (needsRowId && (rowIds == null || rowIds.isEmpty)) {
+      throw QueryCompilationErrors.changelogMissingRowIdError(cl.name)
+    }
+
+    val needsRowVersion = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert()
+    if (needsRowVersion && rowVersionRef.isEmpty) {
+      throw QueryCompilationErrors.changelogMissingRowVersionError(cl.name)
+    }
+
+    // Schema constraints on rowVersion: must be a top-level non-nullable 
column.
+    // Nullable rowVersions break carry-over detection (NULL = NULL is 
unknown, so a
+    // delete+insert pair would be misclassified as a real update).
+    rowVersionRef.foreach { ref =>
+      val fieldNames = ref.fieldNames()
+      if (fieldNames.length != 1) {

Review Comment:
   Done (option b). Removed the top-level-only restriction. 
`NESTED_ROW_VERSION` error class, helper, and subclass are gone.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
 
   override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, 
MICRO_BATCH_READ)
 }
+
+object ChangelogTable {
+
+  def validateSchema(cl: Changelog): Unit = {
+    val byName = cl.columns.map(c => c.name -> c).toMap
+    def check(name: String, expected: DataType*): Unit = {
+      val col = byName.getOrElse(name,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
name))
+      if (expected.nonEmpty && col.dataType != expected.head) {
+        throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+          cl.name, name, expected.head.sql, col.dataType.sql)
+      }
+    }
+    check("_change_type", StringType)
+    check("_commit_version")           // connector-defined, any type accepted
+    check("_commit_timestamp", TimestampType)
+
+    // `rowId()` / `rowVersion()` default to throwing 
UnsupportedOperationException for
+    // connectors that haven't opted in. Translate that into "not declared" so 
we can
+    // reason about it as Option/empty-array below.
+    val rowIds: Array[NamedReference] = try cl.rowId() catch {
+      case _: UnsupportedOperationException => Array.empty
+    }
+    val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) 
catch {
+      case _: UnsupportedOperationException => None
+    }
+
+    // Capability-driven presence checks: a connector that advertises a 
capability which
+    // requires row identity or row versioning must actually expose those 
references.
+    // Otherwise post-processing would crash with an 
UnsupportedOperationException at
+    // runtime instead of producing a clean AnalysisException here.
+    val needsRowId = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert() ||
+      cl.containsIntermediateChanges()
+    if (needsRowId && (rowIds == null || rowIds.isEmpty)) {
+      throw QueryCompilationErrors.changelogMissingRowIdError(cl.name)
+    }
+
+    val needsRowVersion = cl.containsCarryoverRows() ||
+      cl.representsUpdateAsDeleteAndInsert()
+    if (needsRowVersion && rowVersionRef.isEmpty) {
+      throw QueryCompilationErrors.changelogMissingRowVersionError(cl.name)
+    }
+
+    // Schema constraints on rowVersion: must be a top-level non-nullable 
column.
+    // Nullable rowVersions break carry-over detection (NULL = NULL is 
unknown, so a
+    // delete+insert pair would be misclassified as a real update).
+    rowVersionRef.foreach { ref =>
+      val fieldNames = ref.fieldNames()
+      if (fieldNames.length != 1) {
+        throw QueryCompilationErrors.changelogNestedRowVersionError(
+          cl.name, fieldNames.mkString("."))
+      }
+      val columnName = fieldNames(0)
+      val col = byName.getOrElse(columnName,
+        throw QueryCompilationErrors.changelogMissingColumnError(cl.name, 
columnName))

Review Comment:
   Obsolete after removing the nested restriction: the `byName` lookup for 
`rowVersion` is gone. A stale reference hits resolution at plan-build time via 
`V2ExpressionUtils.resolveRef`.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala:
##########
@@ -203,4 +206,159 @@ class ChangelogResolutionSuite extends QueryTest with 
SharedSparkSession {
     assert(range.startingVersion() == "1")
     assert(range.endingVersion().get() == "5")
   }
+
+  // 
===========================================================================
+  // Generic changelog schema validation
+  // 
===========================================================================
+
+  private def stubInfo(): ChangelogInfo = new ChangelogInfo(
+    new ChangelogRange.VersionRange("1", java.util.Optional.of("2"), true, 
true),
+    ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS,
+    false)
+
+  private def cl(name: String, cols: (String, 
org.apache.spark.sql.types.DataType)*)
+      : TestChangelog = {
+    new TestChangelog(name, cols.map { case (n, t) => Column.create(n, t) 
}.toArray)
+  }
+
+  private def missing(columnName: String): Map[String, String] =
+    Map("changelogName" -> "bad_cl", "columnName" -> columnName)
+
+  private def wrongType(columnName: String, expected: String, actual: String)
+      : Map[String, String] = Map(
+    "changelogName" -> "bad_cl",
+    "columnName" -> columnName,
+    "expectedType" -> expected,
+    "actualType" -> actual)
+
+  // Valid metadata tuples; tests swap one of these out to create broken 
schemas.
+  private val validChangeType = "_change_type" -> StringType
+  private val validVersion = "_commit_version" -> LongType
+  private val validTimestamp = "_commit_timestamp" -> TimestampType
+
+  test("ChangelogTable - missing _change_type column throws") {
+    checkError(
+      intercept[AnalysisException] {
+        ChangelogTable(cl("bad_cl", validVersion, validTimestamp), stubInfo())
+      },
+      condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN",
+      parameters = missing("_change_type"))
+  }
+
+  test("ChangelogTable - missing _commit_version column throws") {
+    checkError(
+      intercept[AnalysisException] {
+        ChangelogTable(cl("bad_cl", validChangeType, validTimestamp), 
stubInfo())
+      },
+      condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN",
+      parameters = missing("_commit_version"))
+  }
+
+  test("ChangelogTable - missing _commit_timestamp column throws") {
+    checkError(
+      intercept[AnalysisException] {
+        ChangelogTable(cl("bad_cl", validChangeType, validVersion), stubInfo())
+      },
+      condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN",
+      parameters = missing("_commit_timestamp"))
+  }
+
+  test("ChangelogTable - wrong _change_type data type throws") {
+    checkError(
+      intercept[AnalysisException] {
+        ChangelogTable(
+          cl("bad_cl", "_change_type" -> IntegerType, validVersion, 
validTimestamp),
+          stubInfo())
+      },
+      condition = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE",
+      parameters = wrongType("_change_type", "STRING", "INT"))
+  }
+
+  test("ChangelogTable - wrong _commit_timestamp data type throws") {
+    checkError(
+      intercept[AnalysisException] {
+        ChangelogTable(
+          cl("bad_cl", validChangeType, validVersion, "_commit_timestamp" -> 
LongType),
+          stubInfo())
+      },
+      condition = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE",
+      parameters = wrongType("_commit_timestamp", "TIMESTAMP", "BIGINT"))
+  }
+
+  test("ChangelogTable - _commit_version type is connector-defined (any type 
accepted)") {
+    Seq(IntegerType, LongType, StringType).foreach { versionType =>
+      ChangelogTable(
+        cl("any_cl", validChangeType, "_commit_version" -> versionType, 
validTimestamp),
+        stubInfo())
+    }
+  }
+
+  test("ChangelogTable - valid schema with data columns passes") {
+    ChangelogTable(
+      cl("good_cl", "id" -> LongType, "name" -> StringType,
+        validChangeType, validVersion, validTimestamp),
+      stubInfo())
+  }
+
+  test("ChangelogTable - nullable rowVersion column fails") {
+    val cl = new TestChangelog(
+      "bad_cl",
+      Array(
+        Column.create("id", LongType, false),
+        Column.create("_change_type", StringType),
+        Column.create("_commit_version", LongType),
+        Column.create("_commit_timestamp", TimestampType),
+        Column.create("row_commit_version", LongType)),
+      carryoverRows = true,
+      rowIdRefs = Array(FieldReference.column("id")),
+      rowVersionRef = Some(FieldReference.column("row_commit_version")))
+    checkError(
+      intercept[AnalysisException] { ChangelogTable(cl, stubInfo()) },
+      condition = "INVALID_CHANGELOG_SCHEMA.NULLABLE_ROW_VERSION",
+      parameters = Map(
+        "changelogName" -> "bad_cl",
+        "columnName" -> "row_commit_version"))
+  }
+
+  test("ChangelogTable - non-nullable rowVersion column passes") {
+    val cl = new TestChangelog(
+      "good_cl",
+      Array(
+        Column.create("id", LongType, false),
+        Column.create("_change_type", StringType),
+        Column.create("_commit_version", LongType),
+        Column.create("_commit_timestamp", TimestampType),
+        Column.create("row_commit_version", LongType, false)),
+      carryoverRows = true,
+      rowIdRefs = Array(FieldReference.column("id")),
+      rowVersionRef = Some(FieldReference.column("row_commit_version")))
+    ChangelogTable(cl, stubInfo())
+  }

Review Comment:
   Done. Added five tests covering `MISSING_ROW_ID` via 
`representsUpdateAsDeleteAndInsert` / `containsIntermediateChanges`, 
`UnsupportedOperationException` surfacing for both `rowId()` and 
`rowVersion()`, and a positive nested-refs case (Delta-style `_metadata`). 
Extended `TestChangelog` with `updateAsDeleteInsert`, `intermediateChanges`, 
`rowIdSupported` flags.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to