gengliangwang opened a new pull request, #55567: URL: https://github.com/apache/spark/pull/55567
### What changes were proposed in this pull request? This is **PR 1 of a split** of #55426 (see the [split suggestion](https://github.com/apache/spark/pull/55426#issuecomment-4292375876) for the full plan). Can merge in any order, but 1 (#55507) < 2 (#55508) would be preferable. For more context, see [discussion](https://lists.apache.org/thread/dhxx6pohs7fvqc3knzhtoj4tbcgrwxts) posted to [[email protected]](https://lists.apache.org/[email protected]) and linked [SPIP](https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?tab=t.0#heading=h.m1700lw4wsoj). Validates the CDC metadata columns and row-identity presence returned by a `Changelog` connector at relation construction time, and introduces a dedicated error class to report the failure at analysis time rather than later at execution time with a less helpful error. - `ChangelogTable.validateSchema`: fail-fast checks that the connector schema contains the required metadata columns (`_change_type` as StringType, `_commit_version` of connector-defined type, `_commit_timestamp` as TimestampType), and that `rowId()` returns a non-empty array when a capability requires row identity. `rowVersion()` is invoked when a capability requires it and surfaces the default `UnsupportedOperationException` directly if the connector has not overridden it. References can be top-level or nested (e.g. Delta's `_metadata.row_commit_version`). Invoked from the `ChangelogTable` constructor. - New error class `INVALID_CHANGELOG_SCHEMA` with sub-classes `MISSING_COLUMN`, `INVALID_COLUMN_TYPE`, `MISSING_ROW_ID`. - Matching `QueryCompilationErrors` helpers for each sub-class. - rowVersion nullability is enforced at runtime in the carry-over filter in #55508 via `count(rowVersion) = 2` (see the [#55426 NULL-safety thread](https://github.com/apache/spark/pull/55426#discussion_r3120231839) for rationale). rowId nullability is not enforced. It is covered by the `Changelog.rowId()` Javadoc contract. ### Why are the changes needed? Gives connector implementors a clear analysis-time error message for misshapen CDC schemas instead of an opaque execution-time failure. Background on the original PR and its [discussion thread](https://lists.apache.org/thread/dhxx6pohs7fvqc3knzhtoj4tbcgrwxts). ### Does this PR introduce _any_ user-facing change? Yes, for connector implementors. A connector that returns an invalid changelog schema (missing or wrong-typed metadata column, or advertising a capability requiring row identity without declaring `rowId()`) now fails at analysis time with `INVALID_CHANGELOG_SCHEMA.*`. A connector that advertises a capability requiring `rowId()` or `rowVersion()` without implementing the method surfaces the default `UnsupportedOperationException` at analysis time. ### How was this patch tested? Added schema-validation cases to `ChangelogResolutionSuite` covering: - Missing metadata column: `_change_type`, `_commit_version`, `_commit_timestamp`. - Wrong data type: `_change_type` non-String, `_commit_timestamp` non-Timestamp. - Connector-defined `_commit_version` type accepted (Integer, Long, String). - Valid schema with data columns passes. - Nested rowId and rowVersion references (Delta-style `_metadata.row_id` / `_metadata.row_commit_version`) pass. - `MISSING_ROW_ID` triggered by `representsUpdateAsDeleteAndInsert = true`. - `MISSING_ROW_ID` triggered by `containsIntermediateChanges = true`. - Default `UnsupportedOperationException` on `rowId()` surfaces when a capability requires it. - Default `UnsupportedOperationException` on `rowVersion()` surfaces when a capability requires it. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
