johanl-db commented on code in PR #55507:
URL: https://github.com/apache/spark/pull/55507#discussion_r3131119662
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3282,6 +3282,44 @@
},
"sqlState" : "42K03"
},
+ "INVALID_CHANGELOG_SCHEMA" : {
+ "message" : [
+ "The Change Data Capture (CDC) schema returned by connector
<changelogName> is invalid."
+ ],
+ "subClass" : {
+ "INVALID_COLUMN_TYPE" : {
+ "message" : [
+ "Column `<columnName>` has type <actualType>, expected
<expectedType>."
+ ]
+ },
+ "MISSING_COLUMN" : {
+ "message" : [
+ "Required column `<columnName>` is missing."
+ ]
+ },
+ "MISSING_ROW_ID" : {
+ "message" : [
+ "Connector advertises one or more post-processing properties
(`containsCarryoverRows`, `representsUpdateAsDeleteAndInsert`,
`containsIntermediateChanges`) that require row identity, but
`Changelog.rowId()` returned an empty array. Either set all three to `false`,
or return at least one row-id `NamedReference`."
Review Comment:
The last part is directed to connector developers and not actionable for
actual users, remove it
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3282,6 +3282,44 @@
},
"sqlState" : "42K03"
},
+ "INVALID_CHANGELOG_SCHEMA" : {
+ "message" : [
+ "The Change Data Capture (CDC) schema returned by connector
<changelogName> is invalid."
+ ],
+ "subClass" : {
+ "INVALID_COLUMN_TYPE" : {
+ "message" : [
+ "Column `<columnName>` has type <actualType>, expected
<expectedType>."
+ ]
+ },
+ "MISSING_COLUMN" : {
+ "message" : [
+ "Required column `<columnName>` is missing."
+ ]
+ },
+ "MISSING_ROW_ID" : {
+ "message" : [
+ "Connector advertises one or more post-processing properties
(`containsCarryoverRows`, `representsUpdateAsDeleteAndInsert`,
`containsIntermediateChanges`) that require row identity, but
`Changelog.rowId()` returned an empty array. Either set all three to `false`,
or return at least one row-id `NamedReference`."
+ ]
+ },
+ "MISSING_ROW_VERSION" : {
+ "message" : [
+ "Connector advertises `containsCarryoverRows` or
`representsUpdateAsDeleteAndInsert` is `true`, but `Changelog.rowVersion()` is
not implemented. Override `rowVersion()` to return a `NamedReference` pointing
to a non-nullable column in `Changelog.columns()`."
Review Comment:
Sane here, remove the last part.
Also grammar: 'Connector advertises ... ~is `true`~'
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala:
##########
@@ -45,3 +53,65 @@ case class ChangelogTable(
override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ,
MICRO_BATCH_READ)
}
+
+object ChangelogTable {
+
+ def validateSchema(cl: Changelog): Unit = {
+ val byName = cl.columns.map(c => c.name -> c).toMap
+ def check(name: String, expected: DataType*): Unit = {
+ val col = byName.getOrElse(name,
+ throw QueryCompilationErrors.changelogMissingColumnError(cl.name,
name))
+ if (expected.nonEmpty && col.dataType != expected.head) {
+ throw QueryCompilationErrors.changelogInvalidColumnTypeError(
+ cl.name, name, expected.head.sql, col.dataType.sql)
+ }
+ }
+ check("_change_type", StringType)
+ check("_commit_version") // connector-defined, any type accepted
+ check("_commit_timestamp", TimestampType)
+
+ // `rowId()` / `rowVersion()` default to throwing
UnsupportedOperationException for
+ // connectors that haven't opted in. Translate that into "not declared" so
we can
+ // reason about it as Option/empty-array below.
+ val rowIds: Array[NamedReference] = try cl.rowId() catch {
+ case _: UnsupportedOperationException => Array.empty
+ }
+ val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion())
catch {
+ case _: UnsupportedOperationException => None
+ }
Review Comment:
I'm on the fence here, I think I'd rather not catch
`UnsupportedOperationException` and surface that error directly.
This is an error from connector developers that didn't implement the right
methods even tough the capabilities they reported require it. So it doesn't
really need to have a user-facing error message, surfacing
`UnsupportedOperationException` would actually make it a bit more obvious that
there's an issue with the connector itself.
The important point is that we only call these methods here if the
capabilities require it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]