This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 8631a42fd4cd [SPARK-56643][CONNECT][TEST] Add DSv2 temp view with
stored plan tests using Spark Connect mode
8631a42fd4cd is described below
commit 8631a42fd4cdafd19f5398ee2296ada4a0a5c82d
Author: Thang Long Vu <[email protected]>
AuthorDate: Tue May 26 13:45:21 2026 +0800
[SPARK-56643][CONNECT][TEST] Add DSv2 temp view with stored plan tests
using Spark Connect mode
### What changes were proposed in this pull request?
Extract the 21 DSv2 temp view with stored plan tests from
`DataSourceV2DataFrameSuite` into a shared trait
`DSv2TempViewWithStoredPlanTests`, and add a Connect-mode runner
`DataSourceV2TempViewConnectSuite` so the same tests run in both classic and
Connect modes.
The shared trait covers these scenarios:
1. **Session and external writes** (scenarios 1.1, 1.2, 1.2+cache): Temp
view with filter reflects new data.
2. **Adding columns** (scenarios 2.1, 2.2, 2.2+cache): Temp view preserves
original schema after ADD COLUMN.
3. **Removing columns** (scenarios 3.1, 3.2, 3.2+cache): Temp view detects
column removal.
4. **Drop and recreate table** (scenarios 4.1, 4.2, 4.2+cache): Temp view
resolves to recreated table.
5. **Drop and re-add column with same type** (scenarios 5.1, 5.2,
5.2+cache): Schema validation passes, view continues working with null values.
6. **Drop and re-add column with different type** (scenarios 6.1, 6.2,
6.2+cache): Temp view detects type change.
7. **Type widening** (scenarios 7.1, 7.2, 7.2+cache): Temp view detects INT
to BIGINT type change.
Each test creates a DSv2 table, inserts initial data, builds a temp view
with a filter (`salary < 999`) to demonstrate stored plan behavior, and
verifies the view after table modifications. External writes use the direct
catalog API (`loadTable` + `withData`) to simulate writes from outside the
session. Caching connector variants use `CachingInMemoryTableCatalog` to verify
stale-cache behavior.
This is the Spark Connect counterpart of #55540.
#### Refactoring
- **`DSv2ExternalMutationTestBase`** (new): Abstract base trait with
session-agnostic hooks (`withTestSession`, `checkRows`, `getTableCatalog`,
`withTestTableAndViews`, `externalAppend`) so the same test logic runs in both
classic and Connect modes.
- **`DSv2TempViewWithStoredPlanTests`** (new): Shared trait containing the
21 test methods, extracted from `DataSourceV2DataFrameSuite`.
- **`DataSourceV2TempViewConnectSuite`** (new): Connect-mode runner that
provides Connect-specific session, catalog access, and result comparison.
- **`DataSourceV2DataFrameSuite`** (modified): Mixes in
`DSv2TempViewWithStoredPlanTests` instead of inlining the tests; adds
classic-mode implementations of the abstract hooks.
### Why are the changes needed?
The existing temp view tests in `DataSourceV2DataFrameSuite` use
classic-only APIs (`checkAnswer`, `catalog()` helper) and cannot run in Connect
mode. Extracting them into a shared trait enables Connect parity testing.
### Does this PR introduce _any_ user-facing change?
No. This PR only adds tests and refactors test infrastructure.
### How was this patch tested?
All 21 shared tests pass in both modes:
```
build/sbt 'sql/testOnly *DataSourceV2DataFrameSuite'
build/sbt 'connect/testOnly *DataSourceV2TempViewConnectSuite'
```
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-6)
Closes #55571 from longvu-db/dsv2-connect-pr1-temp-views-clone.
Authored-by: Thang Long Vu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 6993c9148090425af2124edfd120f1d6b79c403c)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../connect/DataSourceV2TempViewConnectSuite.scala | 83 +++
.../connector/DSv2ExternalMutationTestBase.scala | 76 +++
.../DSv2TempViewWithStoredPlanTests.scala | 588 ++++++++++++++++++++
.../sql/connector/DataSourceV2DataFrameSuite.scala | 613 ++-------------------
4 files changed, 781 insertions(+), 579 deletions(-)
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
new file mode 100644
index 000000000000..ce947379b233
--- /dev/null
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
+import org.apache.spark.sql.connector.DSv2TempViewWithStoredPlanTests
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
InMemoryTableCatalog, TableCatalog}
+
+/**
+ * Connect-mode runner for [[DSv2TempViewWithStoredPlanTests]]. All test logic
lives in the shared
+ * trait; this class only provides the Connect-specific session, catalog
access, and result
+ * comparison.
+ */
+class DataSourceV2TempViewConnectSuite
+ extends SparkConnectServerTest
+ with DSv2TempViewWithStoredPlanTests {
+
+ override def sparkConf: SparkConf = super.sparkConf
+ .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
+ .set("spark.sql.catalog.testcat.copyOnLoad", "true")
+ .set("spark.sql.catalog.cachingcat",
classOf[CachingInMemoryTableCatalog].getName)
+ .set("spark.sql.catalog.cachingcat.copyOnLoad", "true")
+
+ override protected def testPrefix: String = "[connect] "
+
+ override protected def withTestSession(fn: SparkSession => Unit): Unit =
+ withSession(fn)
+
+ // Cannot use QueryTest.checkAnswer directly because it accesses
df.logicalPlan,
+ // df.queryExecution, and df.materializedRdd, which are not available on
Connect *client*
+ // DataFrames (they throw ConnectClientUnsupportedErrors). Note: checkAnswer
IS usable from
+ // Connect server tests that operate on classic server-side DataFrames, but
in this suite
+ // `df` is a Connect client DataFrame returned by session.table() /
session.sql().
+ // Instead, collect the rows and delegate to QueryTest.sameRows, which is
the same
+ // value-based, order-agnostic comparison that checkAnswer uses internally.
+ override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit
=
+ QueryTest.sameRows(expected, df.collect().toSeq).foreach(msg => fail(msg))
+
+ override protected def getTableCatalog[C <: TableCatalog: ClassTag](
+ session: SparkSession,
+ catalogName: String): C = {
+ val serverSession = getServerSession(session)
+ val catalog =
serverSession.sessionState.catalogManager.catalog(catalogName)
+ val ct = implicitly[ClassTag[C]]
+ require(
+ ct.runtimeClass.isInstance(catalog),
+ s"Expected ${ct.runtimeClass.getName} but got
${catalog.getClass.getName}")
+ catalog.asInstanceOf[C]
+ }
+
+ // No explicit clearCache() for cachingcat is needed here, unlike the
classic suite.
+ // Each withSession call creates a freshly isolated SparkSession on the
server side
+ // (via SparkConnectSessionManager.newIsolatedSession), and afterEach
invalidates all
+ // sessions, so the CachingInMemoryTableCatalog instance is per-test.
+ override protected def withTestTableAndViews(
+ session: SparkSession,
+ table: String,
+ views: Seq[String] = Seq.empty)(fn: => Unit): Unit = {
+ try { fn }
+ finally {
+ views.foreach(v => session.sql(s"DROP VIEW IF EXISTS $v").collect())
+ session.sql(s"DROP TABLE IF EXISTS $table").collect()
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
new file mode 100644
index 000000000000..2e60c24c4460
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import java.util
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{BufferedRows, CatalogV2Util,
Identifier, InMemoryBaseTable, TableCatalog, TableWritePrivilege}
+
+/**
+ * Base trait for DSv2 tests that involve external table mutations (writes,
schema changes,
+ * drop/recreate) via the catalog API.
+ *
+ * Provides abstract methods so that the same test scenarios can run in both
classic mode
+ * (where the test session IS the server session) and Connect mode (where the
test session
+ * is a Connect client and catalog access requires the server session).
+ *
+ * Concrete suites override the abstract methods and mix in the test trait
+ * [[DSv2TempViewWithStoredPlanTests]].
+ */
+trait DSv2ExternalMutationTestBase extends QueryTest {
+
+ /** Prefix for test names, e.g. "" or "[connect] ". */
+ protected def testPrefix: String
+
+ /** Execute a test body with a session. */
+ protected def withTestSession(fn: SparkSession => Unit): Unit
+
+ /**
+ * Assert that a DataFrame's rows match the expected rows (order-agnostic).
+ */
+ protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit
+
+ /**
+ * Get a [[TableCatalog]] by name from the underlying session.
+ */
+ protected def getTableCatalog[C <: TableCatalog: ClassTag](
+ session: SparkSession,
+ catalogName: String): C
+
+ /** Cleanup wrapper: drop views and the table after the test body, even on
failure. */
+ protected def withTestTableAndViews(
+ session: SparkSession,
+ table: String,
+ views: Seq[String] = Seq.empty)(fn: => Unit): Unit
+
+ /** Appends a row to a DSv2 table via the catalog API, bypassing the
session. */
+ protected def externalAppend(
+ catalog: TableCatalog,
+ ident: Identifier,
+ row: InternalRow): Unit = {
+ val extTable = catalog
+ .loadTable(ident, util.Set.of(TableWritePrivilege.INSERT))
+ .asInstanceOf[InMemoryBaseTable]
+ val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns())
+ extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
new file mode 100644
index 000000000000..eb40e3ac056f
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
Column, Identifier, InMemoryTableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
+
+/**
+ * Shared temp view with stored plan tests for DSv2 tables. These tests verify
that temp views
+ * backed by DSv2 tables correctly handle data changes, schema changes, and
table recreation,
+ * both via session SQL and external catalog mutations.
+ *
+ * NOTE: All `session.sql(...)` calls append `.collect()` because Connect
client DataFrames
+ * are lazy and require an action to trigger execution. In classic mode
`.collect()` on DDL
+ * is a no-op (DDL executes eagerly), so this is harmless.
+ */
+trait DSv2TempViewWithStoredPlanTests extends DSv2ExternalMutationTestBase {
+
+ private val T = "testcat.ns1.ns2.tbl"
+ private val CT = "cachingcat.ns1.ns2.tbl"
+ private val testIdent = Identifier.of(Array("ns1", "ns2"), "tbl")
+
+ // Scenario 1.1 (session write)
+ test(s"${testPrefix}temp view with stored plan reflects session write") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+ checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+ }
+
+ // Scenario 1.2 (external write)
+ test(s"${testPrefix}temp view with stored plan reflects external write") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+ }
+
+ // Scenario 1.2 connector w/ cache (external write, caching connector)
+ test(s"${testPrefix}connector w/ cache: temp view stale after external
write") {
+ withTestSession { session =>
+ withTestTableAndViews(session, CT, Seq("v")) {
+ session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ // Caching connector returns stale table: external write invisible
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, external write
becomes visible
+ session.sql(s"REFRESH TABLE $CT").collect()
+ checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+ }
+
+ // Scenario 2.1 (session ADD COLUMN)
+ test(s"${testPrefix}temp view with stored plan preserves schema after
session ADD COLUMN") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ session.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect()
+ session.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect()
+
+ // view preserves original 2-column schema, filter still applied
+ checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+ }
+
+ // Scenario 2.2 (external ADD COLUMN)
+ test(s"${testPrefix}temp view with stored plan preserves schema after
external ADD COLUMN") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ // external schema change via catalog API
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog.alterTable(testIdent, addCol)
+
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200, -1))
+
+ // view preserves original 2-column schema, filter still applied
+ checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+ }
+
+ // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
+ test(s"${testPrefix}connector w/ cache: temp view stale after external ADD
COLUMN") {
+ withTestSession { session =>
+ withTestTableAndViews(session, CT, Seq("v")) {
+ session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog.alterTable(testIdent, addCol)
+
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200, -1))
+
+ // Caching connector returns stale table: external changes invisible
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, view preserves
original 2-column schema
+ session.sql(s"REFRESH TABLE $CT").collect()
+ checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+ }
+
+ // Scenario 3.1 (session column removal)
+ test(s"${testPrefix}temp view with stored plan detects session column
removal") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
+
+ checkError(
+ exception = intercept[AnalysisException] {
session.table("v").collect() },
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` INT has been removed"))
+ }
+ }
+ }
+
+ // Scenario 3.2 (external column removal)
+ test(s"${testPrefix}temp view with stored plan detects external column
removal") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ catalog.alterTable(testIdent, dropCol)
+
+ checkError(
+ exception = intercept[AnalysisException] {
session.table("v").collect() },
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` INT has been removed"))
+ }
+ }
+ }
+
+ // Scenario 3.2 connector w/ cache (external column removal, caching
connector)
+ test(s"${testPrefix}connector w/ cache: temp view stale after external
column removal") {
+ withTestSession { session =>
+ withTestTableAndViews(session, CT, Seq("v")) {
+ session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ catalog.alterTable(testIdent, dropCol)
+
+ // Caching connector returns stale table: column removal invisible, no
error
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, column removal
detected
+ session.sql(s"REFRESH TABLE $CT").collect()
+ checkError(
+ exception = intercept[AnalysisException] {
session.table("v").collect() },
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` INT has been removed"))
+ }
+ }
+ }
+
+ // Scenario 4.1 (session drop and recreate table)
+ test(s"${testPrefix}temp view with stored plan resolves to session-recreated
table") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val originalTableId = catalog.loadTable(testIdent).id
+
+ session.sql(s"DROP TABLE $T").collect()
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+
+ val newTableId = catalog.loadTable(testIdent).id
+ assert(originalTableId != newTableId)
+
+ // view resolves to the new empty table
+ checkRows(session.table("v"), Seq.empty)
+
+ session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+ checkRows(session.table("v"), Seq(Row(2, 200)))
+ }
+ }
+ }
+
+ // Scenario 4.2 (external drop and recreate table)
+ test(s"${testPrefix}temp view with stored plan resolves to externally
recreated table") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val originalTableId = catalog.loadTable(testIdent).id
+
+ catalog.dropTable(testIdent)
+ catalog.createTable(
+ testIdent,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ val newTableId = catalog.loadTable(testIdent).id
+ assert(originalTableId != newTableId)
+
+ // view resolves to the new empty table
+ checkRows(session.table("v"), Seq.empty)
+
+ session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+ checkRows(session.table("v"), Seq(Row(2, 200)))
+ }
+ }
+ }
+
+ // Scenario 4.2 connector w/ cache (external drop/recreate, caching
connector)
+ test(s"${testPrefix}connector w/ cache: temp view stale after external
drop/recreate") {
+ withTestSession { session =>
+ withTestTableAndViews(session, CT, Seq("v")) {
+ session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
+ catalog.dropTable(testIdent)
+ catalog.createTable(
+ testIdent,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ // Caching connector returns stale table: drop/recreate invisible
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, view resolves to new
empty table
+ session.sql(s"REFRESH TABLE $CT").collect()
+ checkRows(session.table("v"), Seq.empty)
+ }
+ }
+ }
+
+ // Scenario 5.1 (session drop and re-add column with same type, multiple
views)
+ test(s"${testPrefix}temp view with stored plan after session drop and re-add
column same type" +
+ " with unfiltered view") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v", "v_no_filter",
"v_filter_is_null")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(T).createOrReplaceTempView("v_no_filter")
+ session.table(T).filter("salary IS
NULL").createOrReplaceTempView("v_filter_is_null")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+ checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10,
1000)))
+ checkRows(session.table("v_filter_is_null"), Seq.empty)
+
+ // drop and re-add column with same name and type
+ session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
+ session.sql(s"ALTER TABLE $T ADD COLUMN salary INT").collect()
+
+ // salary values are now null, so the filtered view returns nothing
+ checkRows(session.table("v"), Seq.empty)
+ // unfiltered view returns rows with null salary
+ checkRows(session.table("v_no_filter"), Seq(Row(1, null), Row(10,
null)))
+ // IS NULL filter now matches all rows
+ checkRows(session.table("v_filter_is_null"), Seq(Row(1, null), Row(10,
null)))
+ }
+ }
+ }
+
+ // Scenario 5.2 (external drop and re-add column with same type)
+ test(s"${testPrefix}temp view with stored plan after external drop and
re-add column " +
+ "same type") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v", "v_no_filter",
"v_filter_is_null")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(T).createOrReplaceTempView("v_no_filter")
+ session.table(T).filter("salary IS
NULL").createOrReplaceTempView("v_filter_is_null")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+ checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10,
1000)))
+ checkRows(session.table("v_filter_is_null"), Seq.empty)
+
+ // external drop and re-add column via catalog API
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
+ catalog.alterTable(testIdent, dropCol, addCol)
+
+ // salary values are now null, so the filtered view returns nothing
+ checkRows(session.table("v"), Seq.empty)
+ // unfiltered view returns rows with null salary
+ checkRows(session.table("v_no_filter"), Seq(Row(1, null), Row(10,
null)))
+ // IS NULL filter now matches all rows
+ checkRows(session.table("v_filter_is_null"), Seq(Row(1, null), Row(10,
null)))
+ }
+ }
+ }
+
+ // Scenario 5.2 connector w/ cache (external drop/re-add column, caching
connector)
+ test(s"${testPrefix}connector w/ cache: temp view stale after external
drop/re-add column " +
+ "same type") {
+ withTestSession { session =>
+ withTestTableAndViews(session, CT, Seq("v")) {
+ session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
+ catalog.alterTable(testIdent, dropCol, addCol)
+
+ // Caching connector returns stale table: column drop/re-add invisible
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, salary values are
null
+ session.sql(s"REFRESH TABLE $CT").collect()
+ checkRows(session.table("v"), Seq.empty)
+ }
+ }
+ }
+
+ // Scenario 6.1 (session drop and re-add column with different type)
+ test(s"${testPrefix}temp view with stored plan detects session column type
change") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
+ session.sql(s"ALTER TABLE $T ADD COLUMN salary STRING").collect()
+
+ checkError(
+ exception = intercept[AnalysisException] {
session.table("v").collect() },
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to STRING"))
+ }
+ }
+ }
+
+ // Scenario 6.2 (external drop and re-add column with different type)
+ test(s"${testPrefix}temp view with stored plan detects external column type
change") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), StringType, true)
+ catalog.alterTable(testIdent, dropCol, addCol)
+
+ checkError(
+ exception = intercept[AnalysisException] {
session.table("v").collect() },
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to STRING"))
+ }
+ }
+ }
+
+ // Scenario 6.2 connector w/ cache (external column type change, caching
connector)
+ test(s"${testPrefix}connector w/ cache: temp view stale after external
column type change") {
+ withTestSession { session =>
+ withTestTableAndViews(session, CT, Seq("v")) {
+ session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), StringType, true)
+ catalog.alterTable(testIdent, dropCol, addCol)
+
+ // Caching connector returns stale table: type change invisible, no
error
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, type change detected
+ session.sql(s"REFRESH TABLE $CT").collect()
+ checkError(
+ exception = intercept[AnalysisException] {
session.table("v").collect() },
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to STRING"))
+ }
+ }
+ }
+
+ // Scenario 7.1 (session type widening from INT to BIGINT)
+ test(s"${testPrefix}temp view with stored plan detects session type
widening") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ session.sql(s"ALTER TABLE $T ALTER COLUMN salary TYPE LONG").collect()
+
+ checkError(
+ exception = intercept[AnalysisException] {
session.table("v").collect() },
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to BIGINT"))
+ }
+ }
+ }
+
+ // Scenario 7.2 (external type widening from INT to BIGINT)
+ test(s"${testPrefix}temp view with stored plan detects external type
widening") {
+ withTestSession { session =>
+ withTestTableAndViews(session, T, Seq("v")) {
+ session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val updateType = TableChange.updateColumnType(Array("salary"),
LongType)
+ catalog.alterTable(testIdent, updateType)
+
+ checkError(
+ exception = intercept[AnalysisException] {
session.table("v").collect() },
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to BIGINT"))
+ }
+ }
+ }
+
+ // Scenario 7.2 connector w/ cache (external type widening, caching
connector)
+ test(s"${testPrefix}connector w/ cache: temp view stale after external type
widening") {
+ withTestSession { session =>
+ withTestTableAndViews(session, CT, Seq("v")) {
+ session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+ session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
+ val updateType = TableChange.updateColumnType(Array("salary"),
LongType)
+ catalog.alterTable(testIdent, updateType)
+
+ // Caching connector returns stale table: type change invisible, no
error
+ checkRows(session.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, type change detected
+ session.sql(s"REFRESH TABLE $CT").collect()
+ checkError(
+ exception = intercept[AnalysisException] {
session.table("v").collect() },
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to BIGINT"))
+ }
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 4fc93609fb41..13f8a3455480 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -21,14 +21,14 @@ import java.util
import java.util.Collections
import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode,
SparkSession}
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData,
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{BufferedRows,
CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue,
ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable,
InMemoryTableCatalog, MixedColumnIdTableCatalog,
NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog,
SupportsV1OverwriteWithSaveAsTable, TableInfo,
TypeChangeResetsColIdTableCatalog}
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue,
Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog,
NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog,
SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo,
TypeChangeResetsColIdTableCatalog}
import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn,
UpdateColumnDefaultValue}
import org.apache.spark.sql.connector.catalog.TableChange
@@ -46,7 +46,8 @@ import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.unsafe.types.UTF8String
class DataSourceV2DataFrameSuite
- extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests
= false) {
+ extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests
= false)
+ with DSv2TempViewWithStoredPlanTests {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import testImplicits._
@@ -87,6 +88,35 @@ class DataSourceV2DataFrameSuite
catalog.asInstanceOf[InMemoryTableCatalog]
}
+ // DSv2ExternalMutationTestBase implementations for classic mode
+ override protected def testPrefix: String = ""
+
+ override protected def withTestSession(fn: SparkSession => Unit): Unit =
fn(spark)
+
+ override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit
=
+ checkAnswer(df, expected)
+
+ override protected def getTableCatalog[C <: TableCatalog: ClassTag](
+ session: SparkSession,
+ catalogName: String): C = {
+ val c = catalog(catalogName)
+ val ct = implicitly[ClassTag[C]]
+ require(
+ ct.runtimeClass.isInstance(c),
+ s"Expected ${ct.runtimeClass.getName} but got ${c.getClass.getName}")
+ c.asInstanceOf[C]
+ }
+
+ override protected def withTestTableAndViews(
+ session: SparkSession,
+ table: String,
+ views: Seq[String] = Seq.empty)(fn: => Unit): Unit = {
+ withTable(table) {
+ try { fn }
+ finally { views.foreach(v => session.sql(s"DROP VIEW IF EXISTS $v")) }
+ }
+ }
+
override def verifyTable(tableName: String, expected: DataFrame): Unit = {
checkAnswer(spark.table(tableName), expected)
}
@@ -2989,581 +3019,6 @@ class DataSourceV2DataFrameSuite
}
}
- // Temp views with stored plans: scenarios from the DSv2 table refresh tests.
- // Each test creates a DSv2 table with initial data, builds a temp view with
a filter
- // (to demonstrate that the stored plan is non-trivial), and then verifies
the view
- // behavior after various table modifications (session or external).
-
- /** Appends a row to a DSv2 table via the catalog API, bypassing the
session. */
- // The row layout must match the current table column order.
- private def externalAppend(
- catalogName: String,
- ident: Identifier,
- row: InternalRow): Unit = {
- val extTable = catalog(catalogName).loadTable(ident,
- util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
- val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns())
- extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
- }
-
- // Scenario 1.1 (session write)
- test("temp view with stored plan reflects session write") {
- val t = "testcat.ns1.ns2.tbl"
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- sql(s"INSERT INTO $t VALUES (2, 200)")
-
- checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
- }
- }
-
- // Scenario 1.2 (external write)
- test("temp view with stored plan reflects external write") {
- val t = "testcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external writer adds (2, 200) via direct catalog API
- externalAppend(
- catalogName = "testcat",
- ident = ident,
- row = InternalRow(2, 200))
-
- checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
- }
- }
-
- // Scenario 1.2 connector w/ cache (external write, caching connector)
- test("connector w/ cache: temp view stale after external write") {
- val t = "cachingcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external writer adds (2, 200) via catalog API (bypasses cache)
- externalAppend(
- catalogName = "cachingcat",
- ident = ident,
- row = InternalRow(2, 200))
-
- // Caching connector returns stale table: external write invisible
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // REFRESH TABLE invalidates the connector cache, external write becomes
visible
- sql(s"REFRESH TABLE $t")
- checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
- }
- }
-
- // Scenario 2.1 (session ADD COLUMN)
- test("temp view with stored plan preserves schema after session ADD COLUMN")
{
- val t = "testcat.ns1.ns2.tbl"
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- sql(s"ALTER TABLE $t ADD COLUMN new_column INT")
- sql(s"INSERT INTO $t VALUES (2, 200, -1)")
-
- // view preserves original 2-column schema, filter still applied
- checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
- }
- }
-
- // Scenario 2.2 (external ADD COLUMN)
- test("temp view with stored plan preserves schema after external ADD
COLUMN") {
- val t = "testcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external schema change via catalog API
- val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
- catalog("testcat").alterTable(ident, addCol)
-
- // external writer adds data with new schema
- externalAppend(
- catalogName = "testcat",
- ident = ident,
- row = InternalRow(2, 200, -1))
-
- // view preserves original 2-column schema, filter still applied
- checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
- }
- }
-
- // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
- test("connector w/ cache: temp view stale after external ADD COLUMN") {
- val t = "cachingcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external schema change + data via catalog API
- val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
- catalog("cachingcat").alterTable(ident, addCol)
-
- externalAppend(
- catalogName = "cachingcat",
- ident = ident,
- row = InternalRow(2, 200, -1))
-
- // Caching connector returns stale table: external changes invisible
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // REFRESH TABLE invalidates the connector cache, view preserves
original 2-column schema
- sql(s"REFRESH TABLE $t")
- checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
- }
- }
-
- // Scenario 3.1 (session column removal)
- test("temp view with stored plan detects session column removal") {
- val t = "testcat.ns1.ns2.tbl"
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // session schema change via SQL
- sql(s"ALTER TABLE $t DROP COLUMN salary")
-
- checkError(
- exception = intercept[AnalysisException] { spark.table("v").collect()
},
- condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
- parameters = Map(
- "viewName" -> "`v`",
- "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
- "colType" -> "data",
- "errors" -> "- `salary` INT has been removed"))
- }
- }
-
- // Scenario 3.2 (external column removal)
- test("temp view with stored plan detects external column removal") {
- val t = "testcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external schema change via catalog API
- val dropCol = TableChange.deleteColumn(Array("salary"), false)
- catalog("testcat").alterTable(ident, dropCol)
-
- checkError(
- exception = intercept[AnalysisException] { spark.table("v").collect()
},
- condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
- parameters = Map(
- "viewName" -> "`v`",
- "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
- "colType" -> "data",
- "errors" -> "- `salary` INT has been removed"))
- }
- }
-
- // Scenario 3.2 connector w/ cache (external column removal, caching
connector)
- test("connector w/ cache: temp view stale after external column removal") {
- val t = "cachingcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external column removal via catalog API
- val dropCol = TableChange.deleteColumn(Array("salary"), false)
- catalog("cachingcat").alterTable(ident, dropCol)
-
- // Caching connector returns stale table: column removal invisible, no
error
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // REFRESH TABLE invalidates the connector cache, column removal detected
- sql(s"REFRESH TABLE $t")
- checkError(
- exception = intercept[AnalysisException] { spark.table("v").collect()
},
- condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
- parameters = Map(
- "viewName" -> "`v`",
- "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
- "colType" -> "data",
- "errors" -> "- `salary` INT has been removed"))
- }
- }
-
- // Scenario 4.1 (session drop and recreate table)
- test("temp view with stored plan resolves to session-recreated table") {
- val t = "testcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- val originalTableId = catalog("testcat").loadTable(ident).id
-
- // session drop and recreate via SQL
- sql(s"DROP TABLE $t")
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-
- val newTableId = catalog("testcat").loadTable(ident).id
- assert(originalTableId != newTableId)
-
- // view resolves to the new empty table
- checkAnswer(spark.table("v"), Seq.empty)
-
- // insert new data and verify the view picks it up
- sql(s"INSERT INTO $t VALUES (2, 200)")
- checkAnswer(spark.table("v"), Seq(Row(2, 200)))
- }
- }
-
- // Scenario 4.2 (external drop and recreate table)
- test("temp view with stored plan resolves to externally recreated table") {
- val t = "testcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- val originalTableId = catalog("testcat").loadTable(ident).id
-
- // external drop and recreate via catalog API
- catalog("testcat").dropTable(ident)
- catalog("testcat").createTable(
- ident,
- new TableInfo.Builder()
- .withColumns(Array(
- Column.create("id", IntegerType),
- Column.create("salary", IntegerType)))
- .build())
-
- val newTableId = catalog("testcat").loadTable(ident).id
- assert(originalTableId != newTableId)
-
- // view resolves to the new empty table
- checkAnswer(spark.table("v"), Seq.empty)
-
- // insert new data and verify the view picks it up
- sql(s"INSERT INTO $t VALUES (2, 200)")
- checkAnswer(spark.table("v"), Seq(Row(2, 200)))
- }
- }
-
- // Scenario 4.2 connector w/ cache (external drop/recreate, caching
connector)
- test("connector w/ cache: temp view stale after external drop/recreate") {
- val t = "cachingcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external drop and recreate via catalog API
- catalog("cachingcat").dropTable(ident)
- catalog("cachingcat").createTable(
- ident,
- new TableInfo.Builder()
- .withColumns(Array(
- Column.create("id", IntegerType),
- Column.create("salary", IntegerType)))
- .build())
-
- // Caching connector returns stale table: drop/recreate invisible
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // REFRESH TABLE invalidates the connector cache, view resolves to new
empty table
- sql(s"REFRESH TABLE $t")
- checkAnswer(spark.table("v"), Seq.empty)
- }
- }
-
- // Scenario 5.1 (session drop and re-add column with same type, multiple
views)
- test("temp view with stored plan after session drop and re-add column same
type" +
- " with unfiltered view") {
- val t = "testcat.ns1.ns2.tbl"
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- spark.table(t).createOrReplaceTempView("v_no_filter")
- spark.table(t).filter("salary IS
NULL").createOrReplaceTempView("v_filter_is_null")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
- checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000)))
- checkAnswer(spark.table("v_filter_is_null"), Seq.empty)
-
- // drop and re-add column with same name and type
- sql(s"ALTER TABLE $t DROP COLUMN salary")
- sql(s"ALTER TABLE $t ADD COLUMN salary INT")
-
- // salary values are now null, so the filtered view returns nothing
- checkAnswer(spark.table("v"), Seq.empty)
- // unfiltered view returns rows with null salary
- checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null)))
- // IS NULL filter now matches all rows
- checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10,
null)))
- }
- }
-
- // Scenario 5.2 (external drop and re-add column with same type)
- test("temp view with stored plan after external drop and re-add column same
type") {
- val t = "testcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- spark.table(t).createOrReplaceTempView("v_no_filter")
- spark.table(t).filter("salary IS
NULL").createOrReplaceTempView("v_filter_is_null")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
- checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000)))
- checkAnswer(spark.table("v_filter_is_null"), Seq.empty)
-
- // external drop and re-add column via catalog API
- val dropCol = TableChange.deleteColumn(Array("salary"), false)
- val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
- catalog("testcat").alterTable(ident, dropCol, addCol)
-
- // salary values are now null, so the filtered view returns nothing
- checkAnswer(spark.table("v"), Seq.empty)
- // unfiltered view returns rows with null salary
- checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null)))
- // IS NULL filter now matches all rows
- checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10,
null)))
- }
- }
-
- // Scenario 5.2 connector w/ cache (external drop/re-add column, caching
connector)
- test("connector w/ cache: temp view stale after external drop/re-add column
same type") {
- val t = "cachingcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external drop and re-add column with same type via catalog API
- val dropCol = TableChange.deleteColumn(Array("salary"), false)
- val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
- catalog("cachingcat").alterTable(ident, dropCol, addCol)
-
- // Caching connector returns stale table: column drop/re-add invisible
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // REFRESH TABLE invalidates the connector cache, salary values are null
- sql(s"REFRESH TABLE $t")
- checkAnswer(spark.table("v"), Seq.empty)
- }
- }
-
- // Scenario 6.1 (session drop and re-add column with different type)
- test("temp view with stored plan detects session column type change") {
- val t = "testcat.ns1.ns2.tbl"
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // drop and re-add column with same name but different type
- sql(s"ALTER TABLE $t DROP COLUMN salary")
- sql(s"ALTER TABLE $t ADD COLUMN salary STRING")
-
- checkError(
- exception = intercept[AnalysisException] { spark.table("v").collect()
},
- condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
- parameters = Map(
- "viewName" -> "`v`",
- "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
- "colType" -> "data",
- "errors" -> "- `salary` type has changed from INT to STRING"))
- }
- }
-
- // Scenario 6.2 (external drop and re-add column with different type)
- test("temp view with stored plan detects external column type change") {
- val t = "testcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external drop and re-add column with different type via catalog API
- val dropCol = TableChange.deleteColumn(Array("salary"), false)
- val addCol = TableChange.addColumn(Array("salary"), StringType, true)
- catalog("testcat").alterTable(ident, dropCol, addCol)
-
- checkError(
- exception = intercept[AnalysisException] { spark.table("v").collect()
},
- condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
- parameters = Map(
- "viewName" -> "`v`",
- "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
- "colType" -> "data",
- "errors" -> "- `salary` type has changed from INT to STRING"))
- }
- }
-
- // Scenario 6.2 connector w/ cache (external column type change, caching
connector)
- test("connector w/ cache: temp view stale after external column type
change") {
- val t = "cachingcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external drop and re-add column with different type via catalog API
- val dropCol = TableChange.deleteColumn(Array("salary"), false)
- val addCol = TableChange.addColumn(Array("salary"), StringType, true)
- catalog("cachingcat").alterTable(ident, dropCol, addCol)
-
- // Caching connector returns stale table: type change invisible, no error
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // REFRESH TABLE invalidates the connector cache, type change detected
- sql(s"REFRESH TABLE $t")
- checkError(
- exception = intercept[AnalysisException] { spark.table("v").collect()
},
- condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
- parameters = Map(
- "viewName" -> "`v`",
- "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
- "colType" -> "data",
- "errors" -> "- `salary` type has changed from INT to STRING"))
- }
- }
-
- // Scenario 7.1 (session type widening from INT to BIGINT)
- test("temp view with stored plan detects session type widening") {
- val t = "testcat.ns1.ns2.tbl"
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // session type widening via SQL
- sql(s"ALTER TABLE $t ALTER COLUMN salary TYPE LONG")
-
- checkError(
- exception = intercept[AnalysisException] { spark.table("v").collect()
},
- condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
- parameters = Map(
- "viewName" -> "`v`",
- "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
- "colType" -> "data",
- "errors" -> "- `salary` type has changed from INT to BIGINT"))
- }
- }
-
- // Scenario 7.2 (external type widening from INT to BIGINT)
- test("temp view with stored plan detects external type widening") {
- val t = "testcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // widen salary type from INT to BIGINT via catalog API
- val updateType = TableChange.updateColumnType(Array("salary"), LongType)
- catalog("testcat").alterTable(ident, updateType)
-
- checkError(
- exception = intercept[AnalysisException] { spark.table("v").collect()
},
- condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
- parameters = Map(
- "viewName" -> "`v`",
- "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
- "colType" -> "data",
- "errors" -> "- `salary` type has changed from INT to BIGINT"))
- }
- }
-
- // Scenario 7.2 connector w/ cache (external type widening, caching
connector)
- test("connector w/ cache: temp view stale after external type widening") {
- val t = "cachingcat.ns1.ns2.tbl"
- val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // external type widening via catalog API
- val updateType = TableChange.updateColumnType(Array("salary"), LongType)
- catalog("cachingcat").alterTable(ident, updateType)
-
- // Caching connector returns stale table: type change invisible, no error
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
- // REFRESH TABLE invalidates the connector cache, type change detected
- sql(s"REFRESH TABLE $t")
- checkError(
- exception = intercept[AnalysisException] { spark.table("v").collect()
},
- condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
- parameters = Map(
- "viewName" -> "`v`",
- "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
- "colType" -> "data",
- "errors" -> "- `salary` type has changed from INT to BIGINT"))
- }
- }
-
test("cached DSv2 table DataFrame is refreshed and reused after insert") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]