gengliangwang commented on code in PR #51419:
URL: https://github.com/apache/spark/pull/51419#discussion_r3126956334
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala:
##########
@@ -109,6 +110,74 @@ private[sql] object V1Table {
case _ => None
}
}
+
+ def toCatalogTable(
+ catalog: CatalogPlugin,
+ ident: Identifier,
+ t: MetadataOnlyTable): CatalogTable = {
+ val info = t.getTableInfo
+ val props = info.properties.asScala.toMap
+ val tableType = props.get(TableCatalog.PROP_TABLE_TYPE) match {
+ case Some(TableSummary.VIEW_TABLE_TYPE) => CatalogTableType.VIEW
+ case Some(TableSummary.MANAGED_TABLE_TYPE) => CatalogTableType.MANAGED
+ case _ => CatalogTableType.EXTERNAL
+ }
+ val viewText = props.get(TableCatalog.PROP_VIEW_TEXT)
+ // Reserved keys are promoted to first-class CatalogTable fields; strip
them from the
+ // user-visible properties map so they're not double-persisted or leaked
into the serde bag.
+ val userProps = props -- CatalogV2Util.TABLE_RESERVED_PROPERTIES
+ val (serdeProps, tableProps) = userProps.toSeq
+ .partition(_._1.startsWith(TableCatalog.OPTION_PREFIX))
+ val tablePropsMap = tableProps.toMap
+ val (partCols, bucketSpec, clusterBySpec) =
info.partitions.toSeq.convertTransforms
+ // For views, translate the V2 view context
(PROP_VIEW_CURRENT_CATALOG_AND_NAMESPACE, a
+ // single quoted multi-part identifier whose first part is the catalog)
into V1's numbered
+ // viewCatalogAndNamespace properties so the V1 view resolution path can
expand unqualified
+ // identifiers in the view text.
+ val viewContextProps = if (tableType == CatalogTableType.VIEW) {
+ props.get(TableCatalog.PROP_VIEW_CURRENT_CATALOG_AND_NAMESPACE) match {
+ case Some(s) if s.nonEmpty =>
+ val parts = CatalystSqlParser.parseMultipartIdentifier(s)
+ CatalogTable.catalogAndNamespaceToProps(parts.head, parts.tail)
+ case _ =>
+ Map.empty[String, String]
+ }
+ } else {
+ Map.empty[String, String]
+ }
+ CatalogTable(
+ // CatalogTable.identifier uses a single-string database; for multi-part
namespaces we
+ // preserve only the last part here and record the full multi-part form
in `fullIdentOpt`
+ // below. Callers needing the real fully-qualified name (e.g. cyclic
view detection)
+ // should read `CatalogTable.fullIdent`.
+ identifier = TableIdentifier(
+ table = ident.name(),
+ database = ident.namespace().lastOption,
+ catalog = Some(catalog.name())),
+ tableType = tableType,
+ storage = CatalogStorageFormat.empty.copy(
+ locationUri =
props.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI),
+ // v2 table properties should be put into the serde properties as well
in case
+ // they contain data source options.
+ properties = tablePropsMap ++ serdeProps.map {
+ case (k, v) => k.drop(TableCatalog.OPTION_PREFIX.length) -> v
+ }
+ ),
+ schema = CatalogV2Util.v2ColumnsToStructType(info.columns),
+ provider = props.get(TableCatalog.PROP_PROVIDER),
+ partitionColumnNames = partCols,
+ bucketSpec = bucketSpec,
+ owner = props.getOrElse(TableCatalog.PROP_OWNER, "unknown"),
+ viewText = viewText,
Review Comment:
`viewText` is read and assigned unconditionally. If a catalog returns a
`MetadataOnlyTable` with `PROP_VIEW_TEXT` set but `PROP_TABLE_TYPE` is
EXTERNAL/MANAGED (misconfiguration, or a future capability-composition
catalog), the synthesized `CatalogTable` ends up with non-None `viewText` on a
non-view — confusing downstream code that uses `viewText.isDefined` as an
"is-view" proxy. Scoping the read to `VIEW` costs nothing:
```suggestion
val viewText = if (tableType == CatalogTableType.VIEW) {
props.get(TableCatalog.PROP_VIEW_TEXT)
} else {
None
}
```
##########
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala:
##########
@@ -766,9 +770,11 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
}
object ResolvedViewIdentifier {
+ // Only matches session-catalog persistent views. Non-session-catalog
persistent views
+ // (produced for `MetadataOnlyTable`) fall through so they can be picked
up by v2 strategies
+ // rather than silently collapsed to a v1 `TableIdentifier`.
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved
match {
- case ResolvedPersistentView(catalog, ident, _) =>
- assert(isSessionCatalog(catalog))
+ case ResolvedPersistentView(catalog, ident, _) if
isSessionCatalog(catalog) =>
Review Comment:
The comment above says non-session views "fall through so they can be picked
up by v2 strategies." Only `AlterViewAs` is actually picked up.
`ResolvedViewIdentifier` is also used by `SetViewProperties` (line 176),
`UnsetViewProperties` (line 179), `AlterViewSchemaBinding` (line 520),
`RenameTable`-on-view (line 194), and `DescribeRelation`-on-view (line 198-199)
— none of which have a v2 strategy case. After this PR a user can `CREATE VIEW
view_catalog.ns.v ...` and then `ALTER VIEW view_catalog.ns.v SET
TBLPROPERTIES('k'='v')` produces a generic planner "no plan for..." failure.
Pre-PR, `CreateView` was rejected on non-session catalogs, so this orphan state
was structurally unreachable; the pre-PR error path
(`MISSING_CATALOG_ABILITY.VIEWS`) is no longer.
**There is no test coverage** for any of these five plans on a v2 view:
`DataSourceV2MetadataOnlyViewSuite` has no test for `SET TBLPROPERTIES`, `UNSET
TBLPROPERTIES`, `WITH SCHEMA BINDING`, `RENAME TO`, `DESCRIBE`, `SHOW
TBLPROPERTIES`, or `SHOW CREATE VIEW` against a v2-catalog view (the only `WITH
SCHEMA EVOLUTION` reference is on the `CREATE VIEW` path, line 462). The TODO
at lines 36-37 of that suite acknowledges these as follow-ups, but nothing pins
the current failure mode, so any future change (e.g., a reshape of planner
error classes) can silently regress the UX further.
Two options:
1. Pin the current behavior — for each of the five plan types, add a test
that runs the statement against a v2 view and asserts the error it throws
today. Future changes then surface in the diff.
2. Close the gap up front — add explicit `DataSourceV2Strategy` cases (or a
fall-through in `ResolveSessionCatalog`) that throw a clean
`UNSUPPORTED_FEATURE` / `FEATURE_NOT_YET_SUPPORTED` naming the statement. Tests
become one-per-plan and the UX doesn't regress between this PR and the
follow-ups.
Option 2 is the cleaner closure given this PR is already landing the
architectural change that enables the orphaning. Option 1 is the minimum safety
net.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala:
##########
@@ -301,6 +301,44 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
qualifyLocInTableSpec(tableSpec), orCreate = orCreate,
invalidateCache) :: Nil
}
+ case CreateView(ResolvedIdentifier(catalog, ident), userSpecifiedColumns,
comment,
+ collation, properties, originalText, child, allowExisting, replace,
viewSchemaMode,
+ _, _) =>
+ // Gate on TableCatalog + SUPPORTS_VIEW together so non-TableCatalog
plugins still
+ // surface the VIEWS-specific error (instead of the generic TABLES error
that
+ // asTableCatalog would throw).
+ val tableCatalog = catalog match {
+ case tc: TableCatalog
+ if
tc.capabilities().contains(TableCatalogCapability.SUPPORTS_VIEW) => tc
Review Comment:
Nice cleanup on the `CheckViewReferences` side with the new
`catalogAndIdent` helper. The same `case tc: TableCatalog if
tc.capabilities().contains(TableCatalogCapability.SUPPORTS_VIEW) => tc; case _
=> throw missingCatalogViewsAbilityError(catalog)` pattern is still duplicated
at lines 310-314 (CREATE VIEW) and 330-334 (ALTER VIEW AS). Similarly the
`TableIdentifier(ident.name, ident.namespace.lastOption, Some(catalog.name))`
idiom for error-rendering is repeated in `CreateV2ViewExec:60-64` and
`V1Table.toCatalogTable:153-156`. Small helpers — e.g.
`CatalogV2Util.requireViewSupport(catalog)` and an
`asLegacyTableIdentifier(catalogName)` on `IdentifierHelper` — would eliminate
the remaining drift risk. Non-blocking.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2MetadataOnlyViewSuite.scala:
##########
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException,
TableAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier, MetadataOnlyTable,
StagedTable, StagingTableCatalog, Table, TableCatalog, TableCatalogCapability,
TableChange, TableInfo, TableSummary}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ * Tests for the view side of [[MetadataOnlyTable]]: view-text expansion on
read, and
+ * CREATE VIEW / ALTER VIEW ... AS going through the v2 write path
+ * (`CreateV2ViewExec` / `AlterV2ViewExec` and their atomic staging variants).
+ * Data-source-table read paths live in
+ * [[org.apache.spark.sql.connector.DataSourceV2MetadataOnlyTableSuite]].
+ *
+ * TODO: once the remaining v2 view DDL is implemented (SET/UNSET
TBLPROPERTIES, SHOW CREATE
+ * VIEW, RENAME TO, SCHEMA BINDING, DESCRIBE / SHOW TBLPROPERTIES on v2
views), register a
+ * `MetadataOnlyTable`-backed `DelegatingCatalogExtension` as
`spark.sql.catalog.spark_catalog`
+ * and run the shared
[[org.apache.spark.sql.execution.PersistedViewTestSuite]] body against
+ * the v2 path for full parity with the v1 persisted-view coverage.
+ */
+class DataSourceV2MetadataOnlyViewSuite extends QueryTest with
SharedSparkSession {
+ import testImplicits._
+
+ override def sparkConf: SparkConf = super.sparkConf
+ .set("spark.sql.catalog.view_catalog", classOf[TestingViewCatalog].getName)
+
+ // --- View read path -----------------------------------------------------
+
+ test("read view expands SQL text and applies captured SQL configs") {
+ withTable("spark_catalog.default.t") {
+ Seq("a", "b").toDF("col").write.saveAsTable("spark_catalog.default.t")
+ // view_catalog.ansi.test_view stores
view.sqlConfig.spark.sql.ansi.enabled=true;
+ // view_catalog.non_ansi.test_view stores it =false. The view body does
+ // `col::int` which errors in ANSI mode and yields NULL in non-ANSI mode.
+
intercept[Exception](spark.table("view_catalog.ansi.test_view").collect())
+ checkAnswer(spark.table("view_catalog.non_ansi.test_view"), Row("b",
null))
+ }
+ }
+
+ test("read view resolves unqualified refs via captured current
catalog/namespace") {
Review Comment:
The PR's multi-level-namespace correctness hinges on the
`QuotingUtils.quoted` → `parseMultipartIdentifier` round-trip and the v1
unqualified-reference expansion both preserving the captured namespace. Today
that's covered by (a) a builder-level serialization test (line 83) and (b)
cycle-detection tests using `ns1.inner.v`-style identifiers. But there is no
end-to-end *read* test that exercises the round-trip by actually resolving an
unqualified reference inside a view whose captured namespace has >1 part. Add
one: `.withCurrentCatalogAndNamespace("spark_catalog", Array("db1", "db2"))`,
create a table in that namespace, reference it unqualified in the view body,
and check the result.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2MetadataOnlyViewSuite.scala:
##########
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException,
TableAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier, MetadataOnlyTable,
StagedTable, StagingTableCatalog, Table, TableCatalog, TableCatalogCapability,
TableChange, TableInfo, TableSummary}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ * Tests for the view side of [[MetadataOnlyTable]]: view-text expansion on
read, and
+ * CREATE VIEW / ALTER VIEW ... AS going through the v2 write path
+ * (`CreateV2ViewExec` / `AlterV2ViewExec` and their atomic staging variants).
+ * Data-source-table read paths live in
+ * [[org.apache.spark.sql.connector.DataSourceV2MetadataOnlyTableSuite]].
+ *
+ * TODO: once the remaining v2 view DDL is implemented (SET/UNSET
TBLPROPERTIES, SHOW CREATE
+ * VIEW, RENAME TO, SCHEMA BINDING, DESCRIBE / SHOW TBLPROPERTIES on v2
views), register a
+ * `MetadataOnlyTable`-backed `DelegatingCatalogExtension` as
`spark.sql.catalog.spark_catalog`
+ * and run the shared
[[org.apache.spark.sql.execution.PersistedViewTestSuite]] body against
+ * the v2 path for full parity with the v1 persisted-view coverage.
+ */
+class DataSourceV2MetadataOnlyViewSuite extends QueryTest with
SharedSparkSession {
+ import testImplicits._
+
+ override def sparkConf: SparkConf = super.sparkConf
+ .set("spark.sql.catalog.view_catalog", classOf[TestingViewCatalog].getName)
+
+ // --- View read path -----------------------------------------------------
+
+ test("read view expands SQL text and applies captured SQL configs") {
+ withTable("spark_catalog.default.t") {
+ Seq("a", "b").toDF("col").write.saveAsTable("spark_catalog.default.t")
+ // view_catalog.ansi.test_view stores
view.sqlConfig.spark.sql.ansi.enabled=true;
+ // view_catalog.non_ansi.test_view stores it =false. The view body does
+ // `col::int` which errors in ANSI mode and yields NULL in non-ANSI mode.
+
intercept[Exception](spark.table("view_catalog.ansi.test_view").collect())
+ checkAnswer(spark.table("view_catalog.non_ansi.test_view"), Row("b",
null))
+ }
+ }
+
+ test("read view resolves unqualified refs via captured current
catalog/namespace") {
+ withTable("spark_catalog.default.t") {
+ Seq("a", "b").toDF("col").write.saveAsTable("spark_catalog.default.t")
+ // View text uses the unqualified name `t`; it resolves via the stored
+ // current catalog / namespace properties.
+ checkAnswer(spark.table("view_catalog.ns.test_unqualified_view"),
Row("b"))
+ }
+ }
+
+ // --- TableInfo.Builder unit tests for view-specific properties ----------
+
+ test("view current catalog/namespace are serialized into a single property")
{
+ val info = new TableInfo.Builder()
+ .withSchema(new StructType().add("col", "string"))
+ .withViewText("SELECT * FROM t")
+ .withCurrentCatalogAndNamespace("spark_catalog", Array("default"))
+ .build()
+ val table = new MetadataOnlyTable(info)
+
assert(table.properties().get(TableCatalog.PROP_VIEW_CURRENT_CATALOG_AND_NAMESPACE)
==
+ "spark_catalog.default")
+ }
+
+ test("view current catalog/namespace quotes multi-part names with dots") {
+ val info = new TableInfo.Builder()
+ .withSchema(new StructType().add("col", "string"))
+ .withViewText("SELECT * FROM t")
+ .withCurrentCatalogAndNamespace("spark_catalog", Array("weird.db",
"normal"))
+ .build()
+ val table = new MetadataOnlyTable(info)
+
assert(table.properties().get(TableCatalog.PROP_VIEW_CURRENT_CATALOG_AND_NAMESPACE)
==
+ "spark_catalog.`weird.db`.normal")
+ }
+
+ test("view with no current catalog/namespace omits the property") {
+ val info = new TableInfo.Builder()
+ .withSchema(new StructType().add("col", "string"))
+ .withViewText("SELECT * FROM spark_catalog.default.t")
+ .build()
+ val table = new MetadataOnlyTable(info)
+ assert(!table.properties().containsKey(
+ TableCatalog.PROP_VIEW_CURRENT_CATALOG_AND_NAMESPACE))
+ }
+
+ test("withCurrentCatalogAndNamespace clears the property when catalog is
null or empty") {
+ val infoNull = new TableInfo.Builder()
+ .withSchema(new StructType().add("col", "string"))
+ .withViewText("SELECT 1 AS col")
+ .withCurrentCatalogAndNamespace("spark_catalog", Array("default"))
+ .withCurrentCatalogAndNamespace(null, Array("ignored"))
+ .build()
+ assert(!infoNull.properties().containsKey(
+ TableCatalog.PROP_VIEW_CURRENT_CATALOG_AND_NAMESPACE))
+
+ val infoEmpty = new TableInfo.Builder()
+ .withSchema(new StructType().add("col", "string"))
+ .withViewText("SELECT 1 AS col")
+ .withCurrentCatalogAndNamespace("spark_catalog", Array("default"))
+ .withCurrentCatalogAndNamespace("", Array("ignored"))
+ .build()
+ assert(!infoEmpty.properties().containsKey(
+ TableCatalog.PROP_VIEW_CURRENT_CATALOG_AND_NAMESPACE))
+ }
+
+ // --- CREATE VIEW on a plain TableCatalog --------------------------------
+
+ test("CREATE VIEW on a v2 catalog") {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.my_view AS " +
+ "SELECT x FROM spark_catalog.default.t WHERE x > 1")
+ checkAnswer(spark.table("view_catalog.default.my_view"), Seq(Row(2),
Row(3)))
+ }
+ }
+
+ test("CREATE VIEW IF NOT EXISTS is a no-op when the view exists") {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.v_ifne AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ // Re-running with IF NOT EXISTS should not fail and should not change
the view.
+ sql("CREATE VIEW IF NOT EXISTS view_catalog.default.v_ifne AS " +
+ "SELECT x + 100 AS x FROM spark_catalog.default.t")
+ checkAnswer(spark.table("view_catalog.default.v_ifne"),
+ Seq(Row(1), Row(2), Row(3)))
+ }
+ }
+
+ test("CREATE VIEW without IF NOT EXISTS fails when the view exists") {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.v_dup AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ intercept[AnalysisException] {
+ sql("CREATE VIEW view_catalog.default.v_dup AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ }
+ }
+ }
+
+ test("CREATE OR REPLACE VIEW replaces an existing view") {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.v_replace AS " +
+ "SELECT x FROM spark_catalog.default.t WHERE x > 10")
+ checkAnswer(spark.table("view_catalog.default.v_replace"),
Seq.empty[Row])
+ sql("CREATE OR REPLACE VIEW view_catalog.default.v_replace AS " +
+ "SELECT x FROM spark_catalog.default.t WHERE x > 1")
+ checkAnswer(spark.table("view_catalog.default.v_replace"), Seq(Row(2),
Row(3)))
+ }
+ }
+
+ test("CREATE VIEW on a catalog without SUPPORTS_VIEW fails") {
+ withSQLConf(
+ "spark.sql.catalog.no_view_catalog" ->
classOf[TestingTableOnlyCatalog].getName) {
+ val ex = intercept[AnalysisException] {
+ sql("CREATE VIEW no_view_catalog.default.v AS SELECT 1")
+ }
+ assert(ex.getCondition == "MISSING_CATALOG_ABILITY.VIEWS")
+ }
+ }
+
+ test("CREATE VIEW rejects too-few / too-many user-specified columns") {
+ withTable("spark_catalog.default.t") {
+ Seq(1 -> 10).toDF("x", "y").write.saveAsTable("spark_catalog.default.t")
+ intercept[AnalysisException] {
+ sql("CREATE VIEW view_catalog.default.v_few (a) AS " +
+ "SELECT x, y FROM spark_catalog.default.t")
+ }
+ intercept[AnalysisException] {
+ sql("CREATE VIEW view_catalog.default.v_many (a, b, c) AS " +
+ "SELECT x, y FROM spark_catalog.default.t")
+ }
+ }
+ }
+
+ test("CREATE VIEW rejects reference to a temporary function") {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ spark.udf.register("temp_udf", (i: Int) => i + 1)
+ val ex = intercept[AnalysisException] {
+ sql("CREATE VIEW view_catalog.default.v_tempfn AS " +
+ "SELECT temp_udf(x) FROM spark_catalog.default.t")
+ }
+
assert(ex.getMessage.toLowerCase(java.util.Locale.ROOT).contains("temporary"))
+ }
+ }
+
+ test("CREATE VIEW rejects reference to a temporary view") {
+ withTempView("tv") {
+ spark.range(3).createOrReplaceTempView("tv")
+ val ex = intercept[AnalysisException] {
+ sql("CREATE VIEW view_catalog.default.v_tempview AS SELECT id FROM tv")
+ }
+
assert(ex.getMessage.toLowerCase(java.util.Locale.ROOT).contains("temporary"))
+ }
+ }
+
+ test("CREATE VIEW rejects reference to a temporary variable") {
+ withSessionVariable("temp_var") {
+ sql("DECLARE VARIABLE temp_var INT DEFAULT 1")
+ val ex = intercept[AnalysisException] {
+ sql("CREATE VIEW view_catalog.default.v_tempvar AS SELECT temp_var AS
x")
+ }
+
assert(ex.getMessage.toLowerCase(java.util.Locale.ROOT).contains("temporary"))
+ }
+ }
+
+ test("CREATE VIEW propagates DEFAULT COLLATION to TableInfo") {
+ withTable("spark_catalog.default.t") {
+ Seq("a", "b").toDF("col").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.v_coll DEFAULT COLLATION
UTF8_BINARY AS " +
+ "SELECT col FROM spark_catalog.default.t")
+ // TestingViewCatalog stores the TableInfo verbatim, so the collation
property is
+ // observable via the catalog-stored builder output.
+ val catalog = spark.sessionState.catalogManager.catalog("view_catalog")
+ .asInstanceOf[TestingViewCatalog]
+ val info = catalog.getStoredView(Array("default"), "v_coll")
+ assert(info.properties().get(TableCatalog.PROP_COLLATION) ==
"UTF8_BINARY")
+ }
+ }
+
+ test("CREATE OR REPLACE VIEW detects cyclic view references") {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.v_cycle_a AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.v_cycle_b AS " +
+ "SELECT x FROM view_catalog.default.v_cycle_a")
+ val ex = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW view_catalog.default.v_cycle_a AS " +
+ "SELECT x FROM view_catalog.default.v_cycle_b")
+ }
+ assert(ex.getCondition == "RECURSIVE_VIEW")
+ }
+ }
+
+ test("CREATE VIEW over a non-view table entry is rejected (plain
TableCatalog)") {
+ val catalog = spark.sessionState.catalogManager.catalog("view_catalog")
+ .asInstanceOf[TestingViewCatalog]
+ val tableIdent = Identifier.of(Array("default"), "v_existing_table")
+ val tableInfo = new TableInfo.Builder()
+ .withSchema(new StructType().add("col", "string"))
+ .withTableType(TableSummary.EXTERNAL_TABLE_TYPE)
+ .build()
+ catalog.createTable(tableIdent, tableInfo)
+ try {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+
+ // CREATE OR REPLACE VIEW must not silently destroy a non-view table
-- v1 parity.
+ val replaceEx = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW view_catalog.default.v_existing_table AS
" +
+ "SELECT x FROM spark_catalog.default.t")
+ }
+ assert(replaceEx.getCondition ==
"EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE")
+
+ // Plain CREATE VIEW over a table surfaces
TABLE_OR_VIEW_ALREADY_EXISTS, matching v1.
+ val createEx = intercept[AnalysisException] {
+ sql("CREATE VIEW view_catalog.default.v_existing_table AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ }
+ assert(createEx.getCondition == "TABLE_OR_VIEW_ALREADY_EXISTS")
+
+ // CREATE VIEW IF NOT EXISTS is a no-op -- the table entry is
untouched.
+ sql("CREATE VIEW IF NOT EXISTS view_catalog.default.v_existing_table
AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ val stored = catalog.getStoredView(Array("default"),
"v_existing_table")
+ assert(stored.properties().get(TableCatalog.PROP_TABLE_TYPE) ==
+ TableSummary.EXTERNAL_TABLE_TYPE)
+ }
+ } finally {
+ catalog.dropTable(tableIdent)
+ }
+ }
+
+ // --- CREATE VIEW on a StagingTableCatalog -------------------------------
+
+ test("CREATE VIEW on a StagingTableCatalog uses the atomic exec") {
+ withSQLConf(
+ "spark.sql.catalog.staging_catalog" ->
classOf[TestingStagingCatalog].getName) {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+
+ // Plain CREATE -- exercises stageCreate.
+ sql("CREATE VIEW staging_catalog.default.v_atomic AS " +
+ "SELECT x FROM spark_catalog.default.t WHERE x > 1")
+ checkAnswer(
+ spark.table("staging_catalog.default.v_atomic"),
+ Seq(Row(2), Row(3)))
+
+ // Second CREATE without IF NOT EXISTS -- should surface
viewAlreadyExistsError
+ // (TestingStagingCatalog's stageCreate throws
TableAlreadyExistsException, which the
+ // exec wraps).
+ val ex = intercept[AnalysisException] {
+ sql("CREATE VIEW staging_catalog.default.v_atomic AS " +
+ "SELECT x FROM spark_catalog.default.t WHERE x > 1")
+ }
+
assert(ex.getMessage.toLowerCase(java.util.Locale.ROOT).contains("already
exists"))
+
+ // CREATE OR REPLACE -- exercises stageCreateOrReplace.
+ sql("CREATE OR REPLACE VIEW staging_catalog.default.v_atomic AS " +
+ "SELECT x FROM spark_catalog.default.t WHERE x > 2")
+ checkAnswer(spark.table("staging_catalog.default.v_atomic"), Row(3))
+
+ // CREATE IF NOT EXISTS on an existing view -- no-op, but the body is
still validated
+ // first (the atomic exec builds the TableInfo before the
allow-existing short-circuit),
+ // so a malformed body is rejected even when creation is skipped.
+ sql("CREATE VIEW IF NOT EXISTS staging_catalog.default.v_atomic AS " +
+ "SELECT x + 100 AS x FROM spark_catalog.default.t")
+ // Value unchanged -- IF NOT EXISTS was a no-op.
+ checkAnswer(spark.table("staging_catalog.default.v_atomic"), Row(3))
+ }
+ }
+ }
+
+ test("CREATE VIEW over a non-view table entry is rejected
(StagingTableCatalog)") {
+ withSQLConf(
+ "spark.sql.catalog.staging_catalog" ->
classOf[TestingStagingCatalog].getName) {
+ val stagingCatalog =
spark.sessionState.catalogManager.catalog("staging_catalog")
+ .asInstanceOf[TestingStagingCatalog]
+ val tableIdent = Identifier.of(Array("default"), "v_existing_table")
+ val tableInfo = new TableInfo.Builder()
+ .withSchema(new StructType().add("col", "string"))
+ .withTableType(TableSummary.EXTERNAL_TABLE_TYPE)
+ .build()
+ stagingCatalog.createTable(tableIdent, tableInfo)
+ try {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+
+ // CREATE OR REPLACE VIEW must not silently destroy a non-view
table. On a staging
+ // catalog this specifically guards against `stageCreateOrReplace`
committing over
+ // the table.
+ val replaceEx = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW
staging_catalog.default.v_existing_table AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ }
+ assert(replaceEx.getCondition ==
"EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE")
+
+ val createEx = intercept[AnalysisException] {
+ sql("CREATE VIEW staging_catalog.default.v_existing_table AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ }
+ assert(createEx.getCondition == "TABLE_OR_VIEW_ALREADY_EXISTS")
+
+ sql("CREATE VIEW IF NOT EXISTS
staging_catalog.default.v_existing_table AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ val loaded =
stagingCatalog.loadTable(tableIdent).asInstanceOf[MetadataOnlyTable]
+
assert(loaded.getTableInfo.properties.get(TableCatalog.PROP_TABLE_TYPE) ==
+ TableSummary.EXTERNAL_TABLE_TYPE)
+ }
+ } finally {
+ stagingCatalog.dropTable(tableIdent)
+ }
+ }
+ }
+
+ // --- ALTER VIEW ---------------------------------------------------------
+
+ test("ALTER VIEW ... AS updates the view body on a v2 catalog") {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.v_alter AS " +
+ "SELECT x FROM spark_catalog.default.t WHERE x > 10")
+ checkAnswer(spark.table("view_catalog.default.v_alter"), Seq.empty[Row])
+
+ sql("ALTER VIEW view_catalog.default.v_alter AS " +
+ "SELECT x FROM spark_catalog.default.t WHERE x > 1")
+ checkAnswer(spark.table("view_catalog.default.v_alter"), Seq(Row(2),
Row(3)))
+ }
+ }
+
+ test("ALTER VIEW on a missing view fails at analysis") {
+ // UnresolvedView resolves through lookupTableOrView and the missing view
surfaces as an
+ // AnalysisException before we ever reach the v2 exec. The exact error
condition (e.g.
+ // TABLE_OR_VIEW_NOT_FOUND) varies across Spark versions; we just assert
we fail cleanly.
+ intercept[AnalysisException] {
+ sql("ALTER VIEW view_catalog.default.does_not_exist AS SELECT 1 AS x")
+ }
+ }
+
+ test("ALTER VIEW rejects reference to a temporary function") {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.v_alter_tempfn AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ spark.udf.register("temp_udf_alter", (i: Int) => i + 1)
+ val ex = intercept[AnalysisException] {
+ sql("ALTER VIEW view_catalog.default.v_alter_tempfn AS " +
+ "SELECT temp_udf_alter(x) FROM spark_catalog.default.t")
+ }
+
assert(ex.getMessage.toLowerCase(java.util.Locale.ROOT).contains("temporary"))
+ }
+ }
+
+ test("ALTER VIEW preserves user-set TBLPROPERTIES") {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.v_preserve " +
+ "TBLPROPERTIES ('mykey'='myvalue') AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ sql("ALTER VIEW view_catalog.default.v_preserve AS " +
+ "SELECT x + 1 AS x FROM spark_catalog.default.t")
+
+ val catalog = spark.sessionState.catalogManager.catalog("view_catalog")
+ .asInstanceOf[TestingViewCatalog]
+ val info = catalog.getStoredView(Array("default"), "v_preserve")
+ assert(info.properties().get("mykey") == "myvalue")
+ }
+ }
+
+ test("ALTER VIEW preserves PROP_OWNER (v1-parity)") {
+ val catalog = spark.sessionState.catalogManager.catalog("view_catalog")
+ .asInstanceOf[TestingViewCatalog]
+ val viewIdent = Identifier.of(Array("default"), "v_owner")
+ // Pre-seed a view whose stored TableInfo carries an explicit owner.
+ val initialInfo = new TableInfo.Builder()
+ .withSchema(new StructType().add("x", "int"))
+ .withViewText("SELECT 1 AS x")
+ .withOwner("alice")
+ .withCurrentCatalogAndNamespace("spark_catalog", Array("default"))
+ .build()
+ catalog.createTable(viewIdent, initialInfo)
+ try {
+ withTable("spark_catalog.default.t") {
+ Seq(2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("ALTER VIEW view_catalog.default.v_owner AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ // v1 ALTER VIEW AS carries `owner` forward via `viewMeta.copy(...)`.
v2 must match:
+ // the stored TableInfo after the ALTER should still have the original
owner.
+ val info = catalog.getStoredView(Array("default"), "v_owner")
+ assert(info.properties().get(TableCatalog.PROP_OWNER) == "alice")
+ }
+ } finally {
+ catalog.dropTable(viewIdent)
+ }
+ }
+
+ test("ALTER VIEW preserves SCHEMA EVOLUTION binding mode") {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW view_catalog.default.v_evo WITH SCHEMA EVOLUTION AS " +
+ "SELECT x FROM spark_catalog.default.t")
+ sql("ALTER VIEW view_catalog.default.v_evo AS " +
+ "SELECT x + 1 AS x FROM spark_catalog.default.t")
+
+ val catalog = spark.sessionState.catalogManager.catalog("view_catalog")
+ .asInstanceOf[TestingViewCatalog]
+ val info = catalog.getStoredView(Array("default"), "v_evo")
+ // Use the same stored key v1 uses (CatalogTable.VIEW_SCHEMA_MODE =
"view.schemaMode").
+ assert(info.properties().get("view.schemaMode") == "EVOLUTION")
+ }
+ }
+
+ test("ALTER VIEW on a StagingTableCatalog uses the atomic exec
(stageReplace)") {
+ withSQLConf(
+ "spark.sql.catalog.staging_catalog" ->
classOf[TestingStagingCatalog].getName) {
+ withTable("spark_catalog.default.t") {
+ Seq(1, 2, 3).toDF("x").write.saveAsTable("spark_catalog.default.t")
+ sql("CREATE VIEW staging_catalog.default.v_atomic_alter AS " +
+ "SELECT x FROM spark_catalog.default.t WHERE x > 10")
+ checkAnswer(spark.table("staging_catalog.default.v_atomic_alter"),
Seq.empty[Row])
+
+ sql("ALTER VIEW staging_catalog.default.v_atomic_alter AS " +
+ "SELECT x FROM spark_catalog.default.t WHERE x > 1")
+ checkAnswer(
+ spark.table("staging_catalog.default.v_atomic_alter"),
+ Seq(Row(2), Row(3)))
+ }
+ }
+ }
+
+ test("ALTER VIEW on a catalog without SUPPORTS_VIEW fails") {
Review Comment:
`TestingTableOnlyCatalog.loadTable` always throws `NoSuchTableException`, so
the ALTER fails at view resolution — the capability gate in
`DataSourceV2Strategy` (line 330-333) is never reached. The test body's own
comment acknowledges this. As a result the capability-gate rejection on the
ALTER path has no real coverage.
Two fix options:
1. Rename to `"ALTER VIEW on a missing view fails"` — matches what it
actually tests.
2. Better: extend `TestingTableOnlyCatalog` to store a `MetadataOnlyTable`
view so the gate rejection is genuinely exercised. This would also catch
regressions if `SUPPORTS_VIEW` is inadvertently added to the default capability
set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]