This is an automated email from the ASF dual-hosted git repository.
HeartSaVioR pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 05b4d81f3f93 [SPARK-56975][SS] Reject user-specified schema in
DataStreamReader.table()
05b4d81f3f93 is described below
commit 05b4d81f3f938ff140886d6f66ad66d08c66d5b2
Author: You Zhou <[email protected]>
AuthorDate: Thu May 21 11:57:52 2026 +0900
[SPARK-56975][SS] Reject user-specified schema in DataStreamReader.table()
### What changes were proposed in this pull request?
Make `DataStreamReader.table()` reject user-specified schemas by calling
`assertNoSpecifiedSchema("table")`, mirroring `DataStreamReader.changes()`.
### Why are the changes needed?
`DataStreamReader.table()` accepts a user-specified schema without
complaint and then silently ignores it:
```scala
spark.readStream
.schema(new StructType().add("a", IntegerType))
.table("some_table") // no error; the schema has no effect
```
User-specified schema is not a meaningful input to `.table()` — catalog
tables declare their own schema, and `TableCatalog.loadTable(Identifier)` has
no parameter to receive a user schema, so even if Spark wanted to forward one
it couldn't. The user's `.schema(...)` call is therefore always a
misconfiguration.
The rest of `DataStreamReader` already surfaces this kind of
misconfiguration as a clear error:
- `.load()` goes through `DataSourceV2Utils.getTableFromProvider`, which
throws `_LEGACY_ERROR_TEMP_2242` ("`<provider>` source does not support
user-specified schema") when the provider does not implement
`supportsExternalMetadata()`.
- `.changes()` explicitly calls `assertNoSpecifiedSchema("changes")` and
throws `_LEGACY_ERROR_TEMP_1189` ("User specified schema not supported with
`changes`.").
`.table()` is the odd one out: same invalid configuration, no error. Users
can write `readStream.schema(s).table(name)`, see a working query, and
reasonably assume `s` had an effect — when in fact the resulting stream uses
the catalog schema and `s` was dropped. Surfacing this as a clear error aligns
`.table()` with the existing behavior of `.load()` and `.changes()`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added `DataStreamTableAPISuite` test `"read: user-specified schema is not
allowed with table API"`.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #56017 from PorridgeSwim/forbidSpecifySchemaForTable.
Lead-authored-by: You Zhou <[email protected]>
Co-authored-by: You Zhou <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../org/apache/spark/sql/classic/DataStreamReader.scala | 1 +
.../sql/streaming/test/DataStreamTableAPISuite.scala | 16 ++++++++++++++++
2 files changed, 17 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
index a3ab235372d8..ef7cebdb2a19 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala
@@ -102,6 +102,7 @@ final class DataStreamReader private[sql](sparkSession:
SparkSession)
/** @inheritdoc */
def table(tableName: String): DataFrame = {
require(tableName != null, "The table name can't be null")
+ assertNoSpecifiedSchema("table")
val identifier =
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
val unresolved = UnresolvedRelation(
identifier,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index dab667731019..f10d1cdab0d5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -83,6 +83,22 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
checkErrorTableNotFound(e, "`non_exist_table`")
}
+ test("read: user-specified schema is not allowed with table API") {
+ val tblName = "my_table"
+ withTable(tblName) {
+ spark.range(3).write.format("parquet").saveAsTable(tblName)
+ val e = intercept[AnalysisException] {
+ spark.readStream
+ .schema(new StructType().add("a", IntegerType))
+ .table(tblName)
+ }
+ checkError(
+ exception = e,
+ condition = "_LEGACY_ERROR_TEMP_1189",
+ parameters = Map("operation" -> "table"))
+ }
+ }
+
test("read: stream table API with temp view") {
val tblName = "my_table"
val stream = MemoryStream[Int]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]