This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 8631a42fd4cd [SPARK-56643][CONNECT][TEST] Add DSv2 temp view with 
stored plan tests using Spark Connect mode
8631a42fd4cd is described below

commit 8631a42fd4cdafd19f5398ee2296ada4a0a5c82d
Author: Thang Long Vu <[email protected]>
AuthorDate: Tue May 26 13:45:21 2026 +0800

    [SPARK-56643][CONNECT][TEST] Add DSv2 temp view with stored plan tests 
using Spark Connect mode
    
    ### What changes were proposed in this pull request?
    
    Extract the 21 DSv2 temp view with stored plan tests from 
`DataSourceV2DataFrameSuite` into a shared trait 
`DSv2TempViewWithStoredPlanTests`, and add a Connect-mode runner 
`DataSourceV2TempViewConnectSuite` so the same tests run in both classic and 
Connect modes.
    
    The shared trait covers these scenarios:
    
    1. **Session and external writes** (scenarios 1.1, 1.2, 1.2+cache): Temp 
view with filter reflects new data.
    2. **Adding columns** (scenarios 2.1, 2.2, 2.2+cache): Temp view preserves 
original schema after ADD COLUMN.
    3. **Removing columns** (scenarios 3.1, 3.2, 3.2+cache): Temp view detects 
column removal.
    4. **Drop and recreate table** (scenarios 4.1, 4.2, 4.2+cache): Temp view 
resolves to recreated table.
    5. **Drop and re-add column with same type** (scenarios 5.1, 5.2, 
5.2+cache): Schema validation passes, view continues working with null values.
    6. **Drop and re-add column with different type** (scenarios 6.1, 6.2, 
6.2+cache): Temp view detects type change.
    7. **Type widening** (scenarios 7.1, 7.2, 7.2+cache): Temp view detects INT 
to BIGINT type change.
    
    Each test creates a DSv2 table, inserts initial data, builds a temp view 
with a filter (`salary < 999`) to demonstrate stored plan behavior, and 
verifies the view after table modifications. External writes use the direct 
catalog API (`loadTable` + `withData`) to simulate writes from outside the 
session. Caching connector variants use `CachingInMemoryTableCatalog` to verify 
stale-cache behavior.
    
    This is the Spark Connect counterpart of #55540.
    
    #### Refactoring
    
    - **`DSv2ExternalMutationTestBase`** (new): Abstract base trait with 
session-agnostic hooks (`withTestSession`, `checkRows`, `getTableCatalog`, 
`withTestTableAndViews`, `externalAppend`) so the same test logic runs in both 
classic and Connect modes.
    - **`DSv2TempViewWithStoredPlanTests`** (new): Shared trait containing the 
21 test methods, extracted from `DataSourceV2DataFrameSuite`.
    - **`DataSourceV2TempViewConnectSuite`** (new): Connect-mode runner that 
provides Connect-specific session, catalog access, and result comparison.
    - **`DataSourceV2DataFrameSuite`** (modified): Mixes in 
`DSv2TempViewWithStoredPlanTests` instead of inlining the tests; adds 
classic-mode implementations of the abstract hooks.
    
    ### Why are the changes needed?
    
    The existing temp view tests in `DataSourceV2DataFrameSuite` use 
classic-only APIs (`checkAnswer`, `catalog()` helper) and cannot run in Connect 
mode. Extracting them into a shared trait enables Connect parity testing.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This PR only adds tests and refactors test infrastructure.
    
    ### How was this patch tested?
    
    All 21 shared tests pass in both modes:
    ```
    build/sbt 'sql/testOnly *DataSourceV2DataFrameSuite'
    build/sbt 'connect/testOnly *DataSourceV2TempViewConnectSuite'
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-6)
    
    Closes #55571 from longvu-db/dsv2-connect-pr1-temp-views-clone.
    
    Authored-by: Thang Long Vu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 6993c9148090425af2124edfd120f1d6b79c403c)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../connect/DataSourceV2TempViewConnectSuite.scala |  83 +++
 .../connector/DSv2ExternalMutationTestBase.scala   |  76 +++
 .../DSv2TempViewWithStoredPlanTests.scala          | 588 ++++++++++++++++++++
 .../sql/connector/DataSourceV2DataFrameSuite.scala | 613 ++-------------------
 4 files changed, 781 insertions(+), 579 deletions(-)

diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
new file mode 100644
index 000000000000..ce947379b233
--- /dev/null
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
+import org.apache.spark.sql.connector.DSv2TempViewWithStoredPlanTests
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
InMemoryTableCatalog, TableCatalog}
+
+/**
+ * Connect-mode runner for [[DSv2TempViewWithStoredPlanTests]]. All test logic 
lives in the shared
+ * trait; this class only provides the Connect-specific session, catalog 
access, and result
+ * comparison.
+ */
+class DataSourceV2TempViewConnectSuite
+    extends SparkConnectServerTest
+    with DSv2TempViewWithStoredPlanTests {
+
+  override def sparkConf: SparkConf = super.sparkConf
+    .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
+    .set("spark.sql.catalog.testcat.copyOnLoad", "true")
+    .set("spark.sql.catalog.cachingcat", 
classOf[CachingInMemoryTableCatalog].getName)
+    .set("spark.sql.catalog.cachingcat.copyOnLoad", "true")
+
+  override protected def testPrefix: String = "[connect] "
+
+  override protected def withTestSession(fn: SparkSession => Unit): Unit =
+    withSession(fn)
+
+  // Cannot use QueryTest.checkAnswer directly because it accesses 
df.logicalPlan,
+  // df.queryExecution, and df.materializedRdd, which are not available on 
Connect *client*
+  // DataFrames (they throw ConnectClientUnsupportedErrors). Note: checkAnswer 
IS usable from
+  // Connect server tests that operate on classic server-side DataFrames, but 
in this suite
+  // `df` is a Connect client DataFrame returned by session.table() / 
session.sql().
+  // Instead, collect the rows and delegate to QueryTest.sameRows, which is 
the same
+  // value-based, order-agnostic comparison that checkAnswer uses internally.
+  override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit 
=
+    QueryTest.sameRows(expected, df.collect().toSeq).foreach(msg => fail(msg))
+
+  override protected def getTableCatalog[C <: TableCatalog: ClassTag](
+      session: SparkSession,
+      catalogName: String): C = {
+    val serverSession = getServerSession(session)
+    val catalog = 
serverSession.sessionState.catalogManager.catalog(catalogName)
+    val ct = implicitly[ClassTag[C]]
+    require(
+      ct.runtimeClass.isInstance(catalog),
+      s"Expected ${ct.runtimeClass.getName} but got 
${catalog.getClass.getName}")
+    catalog.asInstanceOf[C]
+  }
+
+  // No explicit clearCache() for cachingcat is needed here, unlike the 
classic suite.
+  // Each withSession call creates a freshly isolated SparkSession on the 
server side
+  // (via SparkConnectSessionManager.newIsolatedSession), and afterEach 
invalidates all
+  // sessions, so the CachingInMemoryTableCatalog instance is per-test.
+  override protected def withTestTableAndViews(
+      session: SparkSession,
+      table: String,
+      views: Seq[String] = Seq.empty)(fn: => Unit): Unit = {
+    try { fn }
+    finally {
+      views.foreach(v => session.sql(s"DROP VIEW IF EXISTS $v").collect())
+      session.sql(s"DROP TABLE IF EXISTS $table").collect()
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
new file mode 100644
index 000000000000..2e60c24c4460
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import java.util
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{BufferedRows, CatalogV2Util, 
Identifier, InMemoryBaseTable, TableCatalog, TableWritePrivilege}
+
+/**
+ * Base trait for DSv2 tests that involve external table mutations (writes, 
schema changes,
+ * drop/recreate) via the catalog API.
+ *
+ * Provides abstract methods so that the same test scenarios can run in both 
classic mode
+ * (where the test session IS the server session) and Connect mode (where the 
test session
+ * is a Connect client and catalog access requires the server session).
+ *
+ * Concrete suites override the abstract methods and mix in the test trait
+ * [[DSv2TempViewWithStoredPlanTests]].
+ */
+trait DSv2ExternalMutationTestBase extends QueryTest {
+
+  /** Prefix for test names, e.g. "" or "[connect] ". */
+  protected def testPrefix: String
+
+  /** Execute a test body with a session. */
+  protected def withTestSession(fn: SparkSession => Unit): Unit
+
+  /**
+   * Assert that a DataFrame's rows match the expected rows (order-agnostic).
+   */
+  protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit
+
+  /**
+   * Get a [[TableCatalog]] by name from the underlying session.
+   */
+  protected def getTableCatalog[C <: TableCatalog: ClassTag](
+      session: SparkSession,
+      catalogName: String): C
+
+  /** Cleanup wrapper: drop views and the table after the test body, even on 
failure. */
+  protected def withTestTableAndViews(
+      session: SparkSession,
+      table: String,
+      views: Seq[String] = Seq.empty)(fn: => Unit): Unit
+
+  /** Appends a row to a DSv2 table via the catalog API, bypassing the 
session. */
+  protected def externalAppend(
+      catalog: TableCatalog,
+      ident: Identifier,
+      row: InternalRow): Unit = {
+    val extTable = catalog
+      .loadTable(ident, util.Set.of(TableWritePrivilege.INSERT))
+      .asInstanceOf[InMemoryBaseTable]
+    val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns())
+    extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
new file mode 100644
index 000000000000..eb40e3ac056f
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, Identifier, InMemoryTableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
+
+/**
+ * Shared temp view with stored plan tests for DSv2 tables. These tests verify 
that temp views
+ * backed by DSv2 tables correctly handle data changes, schema changes, and 
table recreation,
+ * both via session SQL and external catalog mutations.
+ *
+ * NOTE: All `session.sql(...)` calls append `.collect()` because Connect 
client DataFrames
+ * are lazy and require an action to trigger execution. In classic mode 
`.collect()` on DDL
+ * is a no-op (DDL executes eagerly), so this is harmless.
+ */
+trait DSv2TempViewWithStoredPlanTests extends DSv2ExternalMutationTestBase {
+
+  private val T = "testcat.ns1.ns2.tbl"
+  private val CT = "cachingcat.ns1.ns2.tbl"
+  private val testIdent = Identifier.of(Array("ns1", "ns2"), "tbl")
+
+  // Scenario 1.1 (session write)
+  test(s"${testPrefix}temp view with stored plan reflects session write") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+        checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+      }
+    }
+  }
+
+  // Scenario 1.2 (external write)
+  test(s"${testPrefix}temp view with stored plan reflects external write") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200))
+
+        checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+      }
+    }
+  }
+
+  // Scenario 1.2 connector w/ cache (external write, caching connector)
+  test(s"${testPrefix}connector w/ cache: temp view stale after external 
write") {
+    withTestSession { session =>
+      withTestTableAndViews(session, CT, Seq("v")) {
+        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200))
+
+        // Caching connector returns stale table: external write invisible
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        // REFRESH TABLE invalidates the connector cache, external write 
becomes visible
+        session.sql(s"REFRESH TABLE $CT").collect()
+        checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+      }
+    }
+  }
+
+  // Scenario 2.1 (session ADD COLUMN)
+  test(s"${testPrefix}temp view with stored plan preserves schema after 
session ADD COLUMN") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        session.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect()
+        session.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect()
+
+        // view preserves original 2-column schema, filter still applied
+        checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+      }
+    }
+  }
+
+  // Scenario 2.2 (external ADD COLUMN)
+  test(s"${testPrefix}temp view with stored plan preserves schema after 
external ADD COLUMN") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        // external schema change via catalog API
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val addCol = TableChange.addColumn(Array("new_column"), IntegerType, 
true)
+        catalog.alterTable(testIdent, addCol)
+
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200, -1))
+
+        // view preserves original 2-column schema, filter still applied
+        checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+      }
+    }
+  }
+
+  // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
+  test(s"${testPrefix}connector w/ cache: temp view stale after external ADD 
COLUMN") {
+    withTestSession { session =>
+      withTestTableAndViews(session, CT, Seq("v")) {
+        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
+        val addCol = TableChange.addColumn(Array("new_column"), IntegerType, 
true)
+        catalog.alterTable(testIdent, addCol)
+
+        externalAppend(catalog = catalog, ident = testIdent, row = 
InternalRow(2, 200, -1))
+
+        // Caching connector returns stale table: external changes invisible
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        // REFRESH TABLE invalidates the connector cache, view preserves 
original 2-column schema
+        session.sql(s"REFRESH TABLE $CT").collect()
+        checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
+      }
+    }
+  }
+
+  // Scenario 3.1 (session column removal)
+  test(s"${testPrefix}temp view with stored plan detects session column 
removal") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
+
+        checkError(
+          exception = intercept[AnalysisException] { 
session.table("v").collect() },
+          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+          parameters = Map(
+            "viewName" -> "`v`",
+            "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+            "colType" -> "data",
+            "errors" -> "- `salary` INT has been removed"))
+      }
+    }
+  }
+
+  // Scenario 3.2 (external column removal)
+  test(s"${testPrefix}temp view with stored plan detects external column 
removal") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val dropCol = TableChange.deleteColumn(Array("salary"), false)
+        catalog.alterTable(testIdent, dropCol)
+
+        checkError(
+          exception = intercept[AnalysisException] { 
session.table("v").collect() },
+          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+          parameters = Map(
+            "viewName" -> "`v`",
+            "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+            "colType" -> "data",
+            "errors" -> "- `salary` INT has been removed"))
+      }
+    }
+  }
+
+  // Scenario 3.2 connector w/ cache (external column removal, caching 
connector)
+  test(s"${testPrefix}connector w/ cache: temp view stale after external 
column removal") {
+    withTestSession { session =>
+      withTestTableAndViews(session, CT, Seq("v")) {
+        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
+        val dropCol = TableChange.deleteColumn(Array("salary"), false)
+        catalog.alterTable(testIdent, dropCol)
+
+        // Caching connector returns stale table: column removal invisible, no 
error
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        // REFRESH TABLE invalidates the connector cache, column removal 
detected
+        session.sql(s"REFRESH TABLE $CT").collect()
+        checkError(
+          exception = intercept[AnalysisException] { 
session.table("v").collect() },
+          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+          parameters = Map(
+            "viewName" -> "`v`",
+            "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+            "colType" -> "data",
+            "errors" -> "- `salary` INT has been removed"))
+      }
+    }
+  }
+
+  // Scenario 4.1 (session drop and recreate table)
+  test(s"${testPrefix}temp view with stored plan resolves to session-recreated 
table") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val originalTableId = catalog.loadTable(testIdent).id
+
+        session.sql(s"DROP TABLE $T").collect()
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+
+        val newTableId = catalog.loadTable(testIdent).id
+        assert(originalTableId != newTableId)
+
+        // view resolves to the new empty table
+        checkRows(session.table("v"), Seq.empty)
+
+        session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+        checkRows(session.table("v"), Seq(Row(2, 200)))
+      }
+    }
+  }
+
+  // Scenario 4.2 (external drop and recreate table)
+  test(s"${testPrefix}temp view with stored plan resolves to externally 
recreated table") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val originalTableId = catalog.loadTable(testIdent).id
+
+        catalog.dropTable(testIdent)
+        catalog.createTable(
+          testIdent,
+          new TableInfo.Builder()
+            .withColumns(Array(
+              Column.create("id", IntegerType),
+              Column.create("salary", IntegerType)))
+            .build())
+
+        val newTableId = catalog.loadTable(testIdent).id
+        assert(originalTableId != newTableId)
+
+        // view resolves to the new empty table
+        checkRows(session.table("v"), Seq.empty)
+
+        session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+        checkRows(session.table("v"), Seq(Row(2, 200)))
+      }
+    }
+  }
+
+  // Scenario 4.2 connector w/ cache (external drop/recreate, caching 
connector)
+  test(s"${testPrefix}connector w/ cache: temp view stale after external 
drop/recreate") {
+    withTestSession { session =>
+      withTestTableAndViews(session, CT, Seq("v")) {
+        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
+        catalog.dropTable(testIdent)
+        catalog.createTable(
+          testIdent,
+          new TableInfo.Builder()
+            .withColumns(Array(
+              Column.create("id", IntegerType),
+              Column.create("salary", IntegerType)))
+            .build())
+
+        // Caching connector returns stale table: drop/recreate invisible
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        // REFRESH TABLE invalidates the connector cache, view resolves to new 
empty table
+        session.sql(s"REFRESH TABLE $CT").collect()
+        checkRows(session.table("v"), Seq.empty)
+      }
+    }
+  }
+
+  // Scenario 5.1 (session drop and re-add column with same type, multiple 
views)
+  test(s"${testPrefix}temp view with stored plan after session drop and re-add 
column same type" +
+      " with unfiltered view") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v", "v_no_filter", 
"v_filter_is_null")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(T).createOrReplaceTempView("v_no_filter")
+        session.table(T).filter("salary IS 
NULL").createOrReplaceTempView("v_filter_is_null")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+        checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10, 
1000)))
+        checkRows(session.table("v_filter_is_null"), Seq.empty)
+
+        // drop and re-add column with same name and type
+        session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
+        session.sql(s"ALTER TABLE $T ADD COLUMN salary INT").collect()
+
+        // salary values are now null, so the filtered view returns nothing
+        checkRows(session.table("v"), Seq.empty)
+        // unfiltered view returns rows with null salary
+        checkRows(session.table("v_no_filter"), Seq(Row(1, null), Row(10, 
null)))
+        // IS NULL filter now matches all rows
+        checkRows(session.table("v_filter_is_null"), Seq(Row(1, null), Row(10, 
null)))
+      }
+    }
+  }
+
+  // Scenario 5.2 (external drop and re-add column with same type)
+  test(s"${testPrefix}temp view with stored plan after external drop and 
re-add column " +
+      "same type") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v", "v_no_filter", 
"v_filter_is_null")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        session.table(T).createOrReplaceTempView("v_no_filter")
+        session.table(T).filter("salary IS 
NULL").createOrReplaceTempView("v_filter_is_null")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+        checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10, 
1000)))
+        checkRows(session.table("v_filter_is_null"), Seq.empty)
+
+        // external drop and re-add column via catalog API
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val dropCol = TableChange.deleteColumn(Array("salary"), false)
+        val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
+        catalog.alterTable(testIdent, dropCol, addCol)
+
+        // salary values are now null, so the filtered view returns nothing
+        checkRows(session.table("v"), Seq.empty)
+        // unfiltered view returns rows with null salary
+        checkRows(session.table("v_no_filter"), Seq(Row(1, null), Row(10, 
null)))
+        // IS NULL filter now matches all rows
+        checkRows(session.table("v_filter_is_null"), Seq(Row(1, null), Row(10, 
null)))
+      }
+    }
+  }
+
+  // Scenario 5.2 connector w/ cache (external drop/re-add column, caching 
connector)
+  test(s"${testPrefix}connector w/ cache: temp view stale after external 
drop/re-add column " +
+      "same type") {
+    withTestSession { session =>
+      withTestTableAndViews(session, CT, Seq("v")) {
+        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
+        val dropCol = TableChange.deleteColumn(Array("salary"), false)
+        val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
+        catalog.alterTable(testIdent, dropCol, addCol)
+
+        // Caching connector returns stale table: column drop/re-add invisible
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        // REFRESH TABLE invalidates the connector cache, salary values are 
null
+        session.sql(s"REFRESH TABLE $CT").collect()
+        checkRows(session.table("v"), Seq.empty)
+      }
+    }
+  }
+
+  // Scenario 6.1 (session drop and re-add column with different type)
+  test(s"${testPrefix}temp view with stored plan detects session column type 
change") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
+        session.sql(s"ALTER TABLE $T ADD COLUMN salary STRING").collect()
+
+        checkError(
+          exception = intercept[AnalysisException] { 
session.table("v").collect() },
+          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+          parameters = Map(
+            "viewName" -> "`v`",
+            "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+            "colType" -> "data",
+            "errors" -> "- `salary` type has changed from INT to STRING"))
+      }
+    }
+  }
+
+  // Scenario 6.2 (external drop and re-add column with different type)
+  test(s"${testPrefix}temp view with stored plan detects external column type 
change") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val dropCol = TableChange.deleteColumn(Array("salary"), false)
+        val addCol = TableChange.addColumn(Array("salary"), StringType, true)
+        catalog.alterTable(testIdent, dropCol, addCol)
+
+        checkError(
+          exception = intercept[AnalysisException] { 
session.table("v").collect() },
+          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+          parameters = Map(
+            "viewName" -> "`v`",
+            "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+            "colType" -> "data",
+            "errors" -> "- `salary` type has changed from INT to STRING"))
+      }
+    }
+  }
+
+  // Scenario 6.2 connector w/ cache (external column type change, caching 
connector)
+  test(s"${testPrefix}connector w/ cache: temp view stale after external 
column type change") {
+    withTestSession { session =>
+      withTestTableAndViews(session, CT, Seq("v")) {
+        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
+        val dropCol = TableChange.deleteColumn(Array("salary"), false)
+        val addCol = TableChange.addColumn(Array("salary"), StringType, true)
+        catalog.alterTable(testIdent, dropCol, addCol)
+
+        // Caching connector returns stale table: type change invisible, no 
error
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        // REFRESH TABLE invalidates the connector cache, type change detected
+        session.sql(s"REFRESH TABLE $CT").collect()
+        checkError(
+          exception = intercept[AnalysisException] { 
session.table("v").collect() },
+          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+          parameters = Map(
+            "viewName" -> "`v`",
+            "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+            "colType" -> "data",
+            "errors" -> "- `salary` type has changed from INT to STRING"))
+      }
+    }
+  }
+
+  // Scenario 7.1 (session type widening from INT to BIGINT)
+  test(s"${testPrefix}temp view with stored plan detects session type 
widening") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        session.sql(s"ALTER TABLE $T ALTER COLUMN salary TYPE LONG").collect()
+
+        checkError(
+          exception = intercept[AnalysisException] { 
session.table("v").collect() },
+          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+          parameters = Map(
+            "viewName" -> "`v`",
+            "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+            "colType" -> "data",
+            "errors" -> "- `salary` type has changed from INT to BIGINT"))
+      }
+    }
+  }
+
+  // Scenario 7.2 (external type widening from INT to BIGINT)
+  test(s"${testPrefix}temp view with stored plan detects external type 
widening") {
+    withTestSession { session =>
+      withTestTableAndViews(session, T, Seq("v")) {
+        session.sql(s"CREATE TABLE $T (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+        val updateType = TableChange.updateColumnType(Array("salary"), 
LongType)
+        catalog.alterTable(testIdent, updateType)
+
+        checkError(
+          exception = intercept[AnalysisException] { 
session.table("v").collect() },
+          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+          parameters = Map(
+            "viewName" -> "`v`",
+            "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+            "colType" -> "data",
+            "errors" -> "- `salary` type has changed from INT to BIGINT"))
+      }
+    }
+  }
+
+  // Scenario 7.2 connector w/ cache (external type widening, caching 
connector)
+  test(s"${testPrefix}connector w/ cache: temp view stale after external type 
widening") {
+    withTestSession { session =>
+      withTestTableAndViews(session, CT, Seq("v")) {
+        session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING 
foo").collect()
+        session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+
+        session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        val catalog = getTableCatalog[CachingInMemoryTableCatalog](session, 
"cachingcat")
+        val updateType = TableChange.updateColumnType(Array("salary"), 
LongType)
+        catalog.alterTable(testIdent, updateType)
+
+        // Caching connector returns stale table: type change invisible, no 
error
+        checkRows(session.table("v"), Seq(Row(1, 100)))
+
+        // REFRESH TABLE invalidates the connector cache, type change detected
+        session.sql(s"REFRESH TABLE $CT").collect()
+        checkError(
+          exception = intercept[AnalysisException] { 
session.table("v").collect() },
+          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+          parameters = Map(
+            "viewName" -> "`v`",
+            "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+            "colType" -> "data",
+            "errors" -> "- `salary` type has changed from INT to BIGINT"))
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 4fc93609fb41..13f8a3455480 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -21,14 +21,14 @@ import java.util
 import java.util.Collections
 
 import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, 
SparkSession}
 import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{BufferedRows, 
CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, 
ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, 
InMemoryTableCatalog, MixedColumnIdTableCatalog, 
NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, 
SupportsV1OverwriteWithSaveAsTable, TableInfo, 
TypeChangeResetsColIdTableCatalog}
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, 
Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog, 
NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, 
SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo, 
TypeChangeResetsColIdTableCatalog}
 import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
UpdateColumnDefaultValue}
 import org.apache.spark.sql.connector.catalog.TableChange
@@ -46,7 +46,8 @@ import org.apache.spark.sql.util.QueryExecutionListener
 import org.apache.spark.unsafe.types.UTF8String
 
 class DataSourceV2DataFrameSuite
-  extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests 
= false) {
+  extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests 
= false)
+  with DSv2TempViewWithStoredPlanTests {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
   import testImplicits._
 
@@ -87,6 +88,35 @@ class DataSourceV2DataFrameSuite
     catalog.asInstanceOf[InMemoryTableCatalog]
   }
 
+  // DSv2ExternalMutationTestBase implementations for classic mode
+  override protected def testPrefix: String = ""
+
+  override protected def withTestSession(fn: SparkSession => Unit): Unit = 
fn(spark)
+
+  override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit 
=
+    checkAnswer(df, expected)
+
+  override protected def getTableCatalog[C <: TableCatalog: ClassTag](
+      session: SparkSession,
+      catalogName: String): C = {
+    val c = catalog(catalogName)
+    val ct = implicitly[ClassTag[C]]
+    require(
+      ct.runtimeClass.isInstance(c),
+      s"Expected ${ct.runtimeClass.getName} but got ${c.getClass.getName}")
+    c.asInstanceOf[C]
+  }
+
+  override protected def withTestTableAndViews(
+      session: SparkSession,
+      table: String,
+      views: Seq[String] = Seq.empty)(fn: => Unit): Unit = {
+    withTable(table) {
+      try { fn }
+      finally { views.foreach(v => session.sql(s"DROP VIEW IF EXISTS $v")) }
+    }
+  }
+
   override def verifyTable(tableName: String, expected: DataFrame): Unit = {
     checkAnswer(spark.table(tableName), expected)
   }
@@ -2989,581 +3019,6 @@ class DataSourceV2DataFrameSuite
     }
   }
 
-  // Temp views with stored plans: scenarios from the DSv2 table refresh tests.
-  // Each test creates a DSv2 table with initial data, builds a temp view with 
a filter
-  // (to demonstrate that the stored plan is non-trivial), and then verifies 
the view
-  // behavior after various table modifications (session or external).
-
-  /** Appends a row to a DSv2 table via the catalog API, bypassing the 
session. */
-  // The row layout must match the current table column order.
-  private def externalAppend(
-      catalogName: String,
-      ident: Identifier,
-      row: InternalRow): Unit = {
-    val extTable = catalog(catalogName).loadTable(ident,
-      util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
-    val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns())
-    extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
-  }
-
-  // Scenario 1.1 (session write)
-  test("temp view with stored plan reflects session write") {
-    val t = "testcat.ns1.ns2.tbl"
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      sql(s"INSERT INTO $t VALUES (2, 200)")
-
-      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
-    }
-  }
-
-  // Scenario 1.2 (external write)
-  test("temp view with stored plan reflects external write") {
-    val t = "testcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external writer adds (2, 200) via direct catalog API
-      externalAppend(
-        catalogName = "testcat",
-        ident = ident,
-        row = InternalRow(2, 200))
-
-      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
-    }
-  }
-
-  // Scenario 1.2 connector w/ cache (external write, caching connector)
-  test("connector w/ cache: temp view stale after external write") {
-    val t = "cachingcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external writer adds (2, 200) via catalog API (bypasses cache)
-      externalAppend(
-        catalogName = "cachingcat",
-        ident = ident,
-        row = InternalRow(2, 200))
-
-      // Caching connector returns stale table: external write invisible
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // REFRESH TABLE invalidates the connector cache, external write becomes 
visible
-      sql(s"REFRESH TABLE $t")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
-    }
-  }
-
-  // Scenario 2.1 (session ADD COLUMN)
-  test("temp view with stored plan preserves schema after session ADD COLUMN") 
{
-    val t = "testcat.ns1.ns2.tbl"
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      sql(s"ALTER TABLE $t ADD COLUMN new_column INT")
-      sql(s"INSERT INTO $t VALUES (2, 200, -1)")
-
-      // view preserves original 2-column schema, filter still applied
-      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
-    }
-  }
-
-  // Scenario 2.2 (external ADD COLUMN)
-  test("temp view with stored plan preserves schema after external ADD 
COLUMN") {
-    val t = "testcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external schema change via catalog API
-      val addCol = TableChange.addColumn(Array("new_column"), IntegerType, 
true)
-      catalog("testcat").alterTable(ident, addCol)
-
-      // external writer adds data with new schema
-      externalAppend(
-        catalogName = "testcat",
-        ident = ident,
-        row = InternalRow(2, 200, -1))
-
-      // view preserves original 2-column schema, filter still applied
-      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
-    }
-  }
-
-  // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
-  test("connector w/ cache: temp view stale after external ADD COLUMN") {
-    val t = "cachingcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external schema change + data via catalog API
-      val addCol = TableChange.addColumn(Array("new_column"), IntegerType, 
true)
-      catalog("cachingcat").alterTable(ident, addCol)
-
-      externalAppend(
-        catalogName = "cachingcat",
-        ident = ident,
-        row = InternalRow(2, 200, -1))
-
-      // Caching connector returns stale table: external changes invisible
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // REFRESH TABLE invalidates the connector cache, view preserves 
original 2-column schema
-      sql(s"REFRESH TABLE $t")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
-    }
-  }
-
-  // Scenario 3.1 (session column removal)
-  test("temp view with stored plan detects session column removal") {
-    val t = "testcat.ns1.ns2.tbl"
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // session schema change via SQL
-      sql(s"ALTER TABLE $t DROP COLUMN salary")
-
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `salary` INT has been removed"))
-    }
-  }
-
-  // Scenario 3.2 (external column removal)
-  test("temp view with stored plan detects external column removal") {
-    val t = "testcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external schema change via catalog API
-      val dropCol = TableChange.deleteColumn(Array("salary"), false)
-      catalog("testcat").alterTable(ident, dropCol)
-
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `salary` INT has been removed"))
-    }
-  }
-
-  // Scenario 3.2 connector w/ cache (external column removal, caching 
connector)
-  test("connector w/ cache: temp view stale after external column removal") {
-    val t = "cachingcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external column removal via catalog API
-      val dropCol = TableChange.deleteColumn(Array("salary"), false)
-      catalog("cachingcat").alterTable(ident, dropCol)
-
-      // Caching connector returns stale table: column removal invisible, no 
error
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // REFRESH TABLE invalidates the connector cache, column removal detected
-      sql(s"REFRESH TABLE $t")
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `salary` INT has been removed"))
-    }
-  }
-
-  // Scenario 4.1 (session drop and recreate table)
-  test("temp view with stored plan resolves to session-recreated table") {
-    val t = "testcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      val originalTableId = catalog("testcat").loadTable(ident).id
-
-      // session drop and recreate via SQL
-      sql(s"DROP TABLE $t")
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-
-      val newTableId = catalog("testcat").loadTable(ident).id
-      assert(originalTableId != newTableId)
-
-      // view resolves to the new empty table
-      checkAnswer(spark.table("v"), Seq.empty)
-
-      // insert new data and verify the view picks it up
-      sql(s"INSERT INTO $t VALUES (2, 200)")
-      checkAnswer(spark.table("v"), Seq(Row(2, 200)))
-    }
-  }
-
-  // Scenario 4.2 (external drop and recreate table)
-  test("temp view with stored plan resolves to externally recreated table") {
-    val t = "testcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      val originalTableId = catalog("testcat").loadTable(ident).id
-
-      // external drop and recreate via catalog API
-      catalog("testcat").dropTable(ident)
-      catalog("testcat").createTable(
-        ident,
-        new TableInfo.Builder()
-          .withColumns(Array(
-            Column.create("id", IntegerType),
-            Column.create("salary", IntegerType)))
-          .build())
-
-      val newTableId = catalog("testcat").loadTable(ident).id
-      assert(originalTableId != newTableId)
-
-      // view resolves to the new empty table
-      checkAnswer(spark.table("v"), Seq.empty)
-
-      // insert new data and verify the view picks it up
-      sql(s"INSERT INTO $t VALUES (2, 200)")
-      checkAnswer(spark.table("v"), Seq(Row(2, 200)))
-    }
-  }
-
-  // Scenario 4.2 connector w/ cache (external drop/recreate, caching 
connector)
-  test("connector w/ cache: temp view stale after external drop/recreate") {
-    val t = "cachingcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external drop and recreate via catalog API
-      catalog("cachingcat").dropTable(ident)
-      catalog("cachingcat").createTable(
-        ident,
-        new TableInfo.Builder()
-          .withColumns(Array(
-            Column.create("id", IntegerType),
-            Column.create("salary", IntegerType)))
-          .build())
-
-      // Caching connector returns stale table: drop/recreate invisible
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // REFRESH TABLE invalidates the connector cache, view resolves to new 
empty table
-      sql(s"REFRESH TABLE $t")
-      checkAnswer(spark.table("v"), Seq.empty)
-    }
-  }
-
-  // Scenario 5.1 (session drop and re-add column with same type, multiple 
views)
-  test("temp view with stored plan after session drop and re-add column same 
type" +
-      " with unfiltered view") {
-    val t = "testcat.ns1.ns2.tbl"
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      spark.table(t).createOrReplaceTempView("v_no_filter")
-      spark.table(t).filter("salary IS 
NULL").createOrReplaceTempView("v_filter_is_null")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-      checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000)))
-      checkAnswer(spark.table("v_filter_is_null"), Seq.empty)
-
-      // drop and re-add column with same name and type
-      sql(s"ALTER TABLE $t DROP COLUMN salary")
-      sql(s"ALTER TABLE $t ADD COLUMN salary INT")
-
-      // salary values are now null, so the filtered view returns nothing
-      checkAnswer(spark.table("v"), Seq.empty)
-      // unfiltered view returns rows with null salary
-      checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null)))
-      // IS NULL filter now matches all rows
-      checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, 
null)))
-    }
-  }
-
-  // Scenario 5.2 (external drop and re-add column with same type)
-  test("temp view with stored plan after external drop and re-add column same 
type") {
-    val t = "testcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      spark.table(t).createOrReplaceTempView("v_no_filter")
-      spark.table(t).filter("salary IS 
NULL").createOrReplaceTempView("v_filter_is_null")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-      checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000)))
-      checkAnswer(spark.table("v_filter_is_null"), Seq.empty)
-
-      // external drop and re-add column via catalog API
-      val dropCol = TableChange.deleteColumn(Array("salary"), false)
-      val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
-      catalog("testcat").alterTable(ident, dropCol, addCol)
-
-      // salary values are now null, so the filtered view returns nothing
-      checkAnswer(spark.table("v"), Seq.empty)
-      // unfiltered view returns rows with null salary
-      checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null)))
-      // IS NULL filter now matches all rows
-      checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, 
null)))
-    }
-  }
-
-  // Scenario 5.2 connector w/ cache (external drop/re-add column, caching 
connector)
-  test("connector w/ cache: temp view stale after external drop/re-add column 
same type") {
-    val t = "cachingcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external drop and re-add column with same type via catalog API
-      val dropCol = TableChange.deleteColumn(Array("salary"), false)
-      val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
-      catalog("cachingcat").alterTable(ident, dropCol, addCol)
-
-      // Caching connector returns stale table: column drop/re-add invisible
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // REFRESH TABLE invalidates the connector cache, salary values are null
-      sql(s"REFRESH TABLE $t")
-      checkAnswer(spark.table("v"), Seq.empty)
-    }
-  }
-
-  // Scenario 6.1 (session drop and re-add column with different type)
-  test("temp view with stored plan detects session column type change") {
-    val t = "testcat.ns1.ns2.tbl"
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // drop and re-add column with same name but different type
-      sql(s"ALTER TABLE $t DROP COLUMN salary")
-      sql(s"ALTER TABLE $t ADD COLUMN salary STRING")
-
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `salary` type has changed from INT to STRING"))
-    }
-  }
-
-  // Scenario 6.2 (external drop and re-add column with different type)
-  test("temp view with stored plan detects external column type change") {
-    val t = "testcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external drop and re-add column with different type via catalog API
-      val dropCol = TableChange.deleteColumn(Array("salary"), false)
-      val addCol = TableChange.addColumn(Array("salary"), StringType, true)
-      catalog("testcat").alterTable(ident, dropCol, addCol)
-
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `salary` type has changed from INT to STRING"))
-    }
-  }
-
-  // Scenario 6.2 connector w/ cache (external column type change, caching 
connector)
-  test("connector w/ cache: temp view stale after external column type 
change") {
-    val t = "cachingcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external drop and re-add column with different type via catalog API
-      val dropCol = TableChange.deleteColumn(Array("salary"), false)
-      val addCol = TableChange.addColumn(Array("salary"), StringType, true)
-      catalog("cachingcat").alterTable(ident, dropCol, addCol)
-
-      // Caching connector returns stale table: type change invisible, no error
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // REFRESH TABLE invalidates the connector cache, type change detected
-      sql(s"REFRESH TABLE $t")
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `salary` type has changed from INT to STRING"))
-    }
-  }
-
-  // Scenario 7.1 (session type widening from INT to BIGINT)
-  test("temp view with stored plan detects session type widening") {
-    val t = "testcat.ns1.ns2.tbl"
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // session type widening via SQL
-      sql(s"ALTER TABLE $t ALTER COLUMN salary TYPE LONG")
-
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `salary` type has changed from INT to BIGINT"))
-    }
-  }
-
-  // Scenario 7.2 (external type widening from INT to BIGINT)
-  test("temp view with stored plan detects external type widening") {
-    val t = "testcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // widen salary type from INT to BIGINT via catalog API
-      val updateType = TableChange.updateColumnType(Array("salary"), LongType)
-      catalog("testcat").alterTable(ident, updateType)
-
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `salary` type has changed from INT to BIGINT"))
-    }
-  }
-
-  // Scenario 7.2 connector w/ cache (external type widening, caching 
connector)
-  test("connector w/ cache: temp view stale after external type widening") {
-    val t = "cachingcat.ns1.ns2.tbl"
-    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // external type widening via catalog API
-      val updateType = TableChange.updateColumnType(Array("salary"), LongType)
-      catalog("cachingcat").alterTable(ident, updateType)
-
-      // Caching connector returns stale table: type change invisible, no error
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-
-      // REFRESH TABLE invalidates the connector cache, type change detected
-      sql(s"REFRESH TABLE $t")
-      checkError(
-        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
-        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
-        parameters = Map(
-          "viewName" -> "`v`",
-          "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
-          "colType" -> "data",
-          "errors" -> "- `salary` type has changed from INT to BIGINT"))
-    }
-  }
-
   test("cached DSv2 table DataFrame is refreshed and reused after insert") {
     val t = "testcat.ns1.ns2.tbl"
     withTable(t) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to