This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 236f04007c76 [SPARK-56643][SQL][TESTS] Add DSv2 temp view with stored 
plan tests
236f04007c76 is described below

commit 236f04007c76cbc7e91ecfeca8341fe323109e35
Author: Thang Long Vu <[email protected]>
AuthorDate: Wed May 20 21:25:41 2026 +0800

    [SPARK-56643][SQL][TESTS] Add DSv2 temp view with stored plan tests
    
    ### What changes were proposed in this pull request?
    
    Add new tests to `DataSourceV2DataFrameSuite` that verify temporary view 
behavior with stored plans when the underlying DSv2 table changes.
    
    Each scenario tests three sub-cases:
    - **Session write**: Table modification via SQL (same session).
    - **External write**: Table modification via the catalog API (simulates 
writes from outside the session).
    - **External write with cache**: Same as external write, but with the temp 
view cached first.
    
    **Scenarios:**
    
    1. **Insert new data** (1.1 session, 1.2 external, 1.3 external+cache): 
Temp view with filter reflects new data.
    2. **ADD COLUMN** (2.1 session, 2.2 external, 2.3 external+cache): Temp 
view preserves original schema after ADD COLUMN.
    3. **DROP COLUMN** (3.1 session, 3.2 external, 3.3 external+cache): Temp 
view detects column removal.
    4. **Drop and recreate table** (4.1 session, 4.2 external, 4.3 
external+cache): Temp view resolves to recreated table.
    5. **Drop and re-add column with same type** (5.1 session, 5.2 external, 
5.3 external+cache): Schema validation passes, view continues working.
    6. **Drop and re-add column with different type** (6.1 session, 6.2 
external, 6.3 external+cache): Temp view detects type change.
    7. **Type widening** (7.1 session, 7.2 external, 7.3 external+cache): Temp 
view detects INT to BIGINT type change.
    
    Each test creates a DSv2 table, inserts initial data, builds a temp view 
with a filter (`salary < 999`) to demonstrate stored plan behavior, and 
verifies the view after table modifications.
    
    #### Shared test infrastructure changes
    
    - **`CachingInMemoryTableCatalog`** (new): A per-instance caching test 
catalog that wraps `InMemoryTableCatalog` with a `ConcurrentHashMap`-based 
`loadTable` cache. Simulates Iceberg-style stale-cache behavior where the 
catalog returns a cached `Table` object even after external writes modify the 
underlying data.
    - **`BasicInMemoryTableCatalog.alterTable`**: Fixed data-migration 
semantics for combined DROP + ADD column changes. The intermediate schema 
(after drops, before adds) is now passed to `alterTableWithData` so that old 
column data is properly dropped rather than retained by name-matching.
    
    ### Why are the changes needed?
    
    The existing SPARK-53924 tests in `DataSourceV2DataFrameSuite` cover basic 
schema change detection for temp views on DSv2 tables but are missing:
    - Filter-based stored plan tests
    - External write variants (using catalog API to simulate writes from 
outside the session)
    - External write with cache variants
    - Scenarios 5, 6, 7 (drop+re-add column same/different type, type widening)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This PR only adds tests and modifies test-only infrastructure.
    
    ### How was this patch tested?
    
    New tests in `DataSourceV2DataFrameSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-6)
    
    Closes #55540 from longvu-db/dsv2-classic-pr1-temp-views.
    
    Authored-by: Thang Long Vu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit e7c0a3490ca80b56bd4b4a1982954c48763d94ea)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../catalog/CachingInMemoryTableCatalog.scala      |  50 ++
 .../connector/catalog/InMemoryTableCatalog.scala   |  21 +-
 .../sql/connector/DataSourceV2DataFrameSuite.scala | 620 ++++++++++++++++++++-
 3 files changed, 665 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala
new file mode 100644
index 000000000000..f8e3224fa7e1
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * An InMemoryTableCatalog that simulates a caching connector like
+ * Iceberg's CachingCatalog. On first [[loadTable]], returns a fresh
+ * copy. On subsequent loads, returns the CACHED (stale) copy,
+ * making external changes invisible.
+ *
+ * Session writes go through the write-variant [[loadTable]], which is not
+ * cached, so they modify the underlying table directly. Cached [[loadTable]]
+ * results may still be stale until [[clearCache]] or REFRESH TABLE (which
+ * invokes [[invalidateTable]]) is called.
+ *
+ * Only the primary 
[[loadTable(ident:org\.apache\.spark\.sql\.connector\.catalog\.Identifier)*]]
+ * overload is cached. Version and timestamp overloads bypass the cache, 
matching
+ * time-travel semantics. [[dropTable]], [[createTable]], and [[alterTable]] 
do not
+ * invalidate the cache, matching the behavior of real caching connectors.
+ */
+class CachingInMemoryTableCatalog extends InMemoryTableCatalog {
+  private val cachedTables = new ConcurrentHashMap[Identifier, Table]()
+
+  override def loadTable(ident: Identifier): Table =
+    cachedTables.computeIfAbsent(ident, _ => super.loadTable(ident))
+
+  override def invalidateTable(ident: Identifier): Unit = {
+    super.invalidateTable(ident)
+    cachedTables.remove(ident)
+  }
+
+  def clearCache(): Unit = cachedTables.clear()
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 1654e9e9a66d..d39e422a9d9e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -179,6 +179,23 @@ class BasicInMemoryTableCatalog extends TableCatalog {
       throw new IllegalArgumentException(s"Cannot drop all fields")
     }
 
+    // Compute the intermediate schema that only reflects column deletions.
+    // [[InMemoryBaseTable.alterTableWithData]] decides which old-row fields 
to keep by
+    // matching names against its newSchema argument. Passing this post-drop 
schema
+    // (rather than the final schema that may re-add a same-named column) 
ensures that
+    // dropped column values are physically removed from existing data.
+    // Note: this only handles top-level column deletions. Nested column 
deletions
+    // would need additional handling, but [[alterTableWithData]] only filters 
by
+    // top-level field name anyway.
+    val deletedTopLevelNames = changes.collect {
+      case d: TableChange.DeleteColumn if d.fieldNames.length == 1 => 
d.fieldNames.head
+    }.toSet
+    val schemaAfterDrops = if (deletedTopLevelNames.nonEmpty) {
+      StructType(table.schema.fields.filterNot(f => 
deletedTopLevelNames(f.name)))
+    } else {
+      schema
+    }
+
     table.increaseVersion()
     val currentVersion = table.version()
     val columnsWithIds = InMemoryBaseTable.assignMissingIds(
@@ -193,14 +210,14 @@ class BasicInMemoryTableCatalog extends TableCatalog {
           properties = properties,
           constraints = constraints,
           id = table.id)
-          .alterTableWithData(table.data, schema)
+          .alterTableWithData(table.data, schemaAfterDrops)
       case _: InMemoryTableWithV2Filter =>
         new InMemoryTableWithV2Filter(
           name = table.name,
           columns = columnsWithIds,
           partitioning = finalPartitioning,
           properties = properties)
-          .alterTableWithData(table.data, schema)
+          .alterTableWithData(table.data, schemaAfterDrops)
       case other =>
         throw new UnsupportedOperationException(
           s"Unsupported InMemoryBaseTable subclass: ${other.getClass.getName}")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index c532ef359a7c..4fc93609fb41 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -25,9 +25,10 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
 import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, 
ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryTableCatalog, 
MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, 
NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, 
TypeChangeResetsColIdTableCatalog}
+import org.apache.spark.sql.connector.catalog.{BufferedRows, 
CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, 
ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, 
InMemoryTableCatalog, MixedColumnIdTableCatalog, 
NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, 
SupportsV1OverwriteWithSaveAsTable, TableInfo, 
TypeChangeResetsColIdTableCatalog}
 import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
UpdateColumnDefaultValue}
 import org.apache.spark.sql.connector.catalog.TableChange
@@ -54,6 +55,9 @@ class DataSourceV2DataFrameSuite
     .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
     .set("spark.sql.catalog.testcat.copyOnLoad", "true")
     .set("spark.sql.catalog.testcat2", classOf[InMemoryTableCatalog].getName)
+    .set("spark.sql.catalog.cachingcat",
+      classOf[CachingInMemoryTableCatalog].getName)
+    .set("spark.sql.catalog.cachingcat.copyOnLoad", "true")
     .set("spark.sql.catalog.nullidcat",
       classOf[NullTableIdInMemoryTableCatalog].getName)
     .set("spark.sql.catalog.nullidcat.copyOnLoad", "true")
@@ -71,6 +75,7 @@ class DataSourceV2DataFrameSuite
     .set("spark.sql.catalog.composedidcat.copyOnLoad", "true")
 
   after {
+    
catalog("cachingcat").asInstanceOf[CachingInMemoryTableCatalog].clearCache()
     spark.sessionState.catalogManager.reset()
   }
 
@@ -2520,29 +2525,6 @@ class DataSourceV2DataFrameSuite
     }
   }
 
-  test("temp view with stored plan after session drop and re-add column same 
type") {
-    val t = "testcat.ns1.ns2.tbl"
-    withTable(t) {
-      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
-      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
-      // create two temp views with salary filters
-      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
-      spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_null")
-      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
-      checkAnswer(spark.table("v_null"), Seq.empty)
-
-      // drop and re-add column with same name and type
-      sql(s"ALTER TABLE $t DROP COLUMN salary")
-      sql(s"ALTER TABLE $t ADD COLUMN salary INT")
-
-      // salary values are now null, so the salary < 999 filter returns nothing
-      checkAnswer(spark.table("v"), Seq.empty)
-      // IS NULL filter now matches all rows
-      checkAnswer(spark.table("v_null"), Seq(Row(1, null), Row(10, null)))
-    }
-  }
-
   // Column ID tests: Write operations
   //
   // [[writeTo().append()]] eagerly executes the command during the
@@ -2723,6 +2705,7 @@ class DataSourceV2DataFrameSuite
 
       // create temp view using DataFrame API
       spark.table(t).createOrReplaceTempView("v")
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // add top-level column to underlying table
@@ -2730,6 +2713,7 @@ class DataSourceV2DataFrameSuite
 
       // accessing temp view should succeed as top-level column additions are 
allowed
       // view captures original columns
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // insert data to verify view still works correctly
@@ -2745,6 +2729,7 @@ class DataSourceV2DataFrameSuite
 
       // create temp view using DataFrame API
       spark.table(t).createOrReplaceTempView("v")
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "address"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // add nested column to underlying table
@@ -2769,6 +2754,7 @@ class DataSourceV2DataFrameSuite
 
       // create temp view
       spark.table(t).createOrReplaceTempView("v")
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data", 
"age"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // drop column from underlying table
@@ -2793,6 +2779,7 @@ class DataSourceV2DataFrameSuite
 
       // create temp view using DataFrame API
       spark.table(t).createOrReplaceTempView("v")
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "address"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // drop nested column from underlying table
@@ -2817,6 +2804,7 @@ class DataSourceV2DataFrameSuite
 
       // create temp view
       spark.table(t).createOrReplaceTempView("v")
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // change nullability constraint using ALTER TABLE
@@ -2858,6 +2846,7 @@ class DataSourceV2DataFrameSuite
       assert(originalTableId != newTableId)
 
       // accessing temp view should work despite table ID change (returns 
empty data)
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // insert new data and verify view reflects it
@@ -2873,6 +2862,7 @@ class DataSourceV2DataFrameSuite
       sql(s"CREATE TABLE $t (id bigint, data STRING, extra INT) USING foo")
 
       spark.table(t).createOrReplaceTempView("v")
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data", 
"extra"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // alter table
@@ -2883,6 +2873,7 @@ class DataSourceV2DataFrameSuite
 
       // recreate view with updated schema
       spark.table(t).createOrReplaceTempView("v")
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // now it should work with new schema
@@ -2913,6 +2904,7 @@ class DataSourceV2DataFrameSuite
 
       // accessing temp view should succeed as top-level column additions are 
allowed
 
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
       checkAnswer(spark.table("v"), Seq.empty)
     }
   }
@@ -2925,6 +2917,7 @@ class DataSourceV2DataFrameSuite
 
         // create temp view using SQL that should capture plan
         sql(s"CREATE OR REPLACE TEMPORARY VIEW v AS SELECT * FROM $t")
+        assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
         checkAnswer(spark.table("v"), Seq.empty)
 
         // verify that view stores analyzed plan
@@ -2935,6 +2928,7 @@ class DataSourceV2DataFrameSuite
         sql(s"ALTER TABLE $t ADD COLUMN age int")
 
         // accessing temp view should succeed as top-level column additions 
are allowed
+        assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
         checkAnswer(spark.table("v"), Seq.empty)
 
         // insert data to verify view still works correctly
@@ -2951,6 +2945,7 @@ class DataSourceV2DataFrameSuite
 
       // create temp view
       spark.table(t).createOrReplaceTempView("v")
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "name"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // change VARCHAR(10) to VARCHAR(20)
@@ -2975,6 +2970,7 @@ class DataSourceV2DataFrameSuite
 
       // create temp view
       spark.table(t).createOrReplaceTempView("v")
+      assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
       checkAnswer(spark.table("v"), Seq.empty)
 
       // insert data into underlying table (no schema change)
@@ -2993,6 +2989,581 @@ class DataSourceV2DataFrameSuite
     }
   }
 
+  // Temp views with stored plans: scenarios from the DSv2 table refresh tests.
+  // Each test creates a DSv2 table with initial data, builds a temp view with 
a filter
+  // (to demonstrate that the stored plan is non-trivial), and then verifies 
the view
+  // behavior after various table modifications (session or external).
+
+  /** Appends a row to a DSv2 table via the catalog API, bypassing the 
session. */
+  // The row layout must match the current table column order.
+  private def externalAppend(
+      catalogName: String,
+      ident: Identifier,
+      row: InternalRow): Unit = {
+    val extTable = catalog(catalogName).loadTable(ident,
+      util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+    val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns())
+    extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
+  }
+
+  // Scenario 1.1 (session write)
+  test("temp view with stored plan reflects session write") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      sql(s"INSERT INTO $t VALUES (2, 200)")
+
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 1.2 (external write)
+  test("temp view with stored plan reflects external write") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external writer adds (2, 200) via direct catalog API
+      externalAppend(
+        catalogName = "testcat",
+        ident = ident,
+        row = InternalRow(2, 200))
+
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 1.2 connector w/ cache (external write, caching connector)
+  test("connector w/ cache: temp view stale after external write") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external writer adds (2, 200) via catalog API (bypasses cache)
+      externalAppend(
+        catalogName = "cachingcat",
+        ident = ident,
+        row = InternalRow(2, 200))
+
+      // Caching connector returns stale table: external write invisible
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, external write becomes 
visible
+      sql(s"REFRESH TABLE $t")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 2.1 (session ADD COLUMN)
+  test("temp view with stored plan preserves schema after session ADD COLUMN") 
{
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      sql(s"ALTER TABLE $t ADD COLUMN new_column INT")
+      sql(s"INSERT INTO $t VALUES (2, 200, -1)")
+
+      // view preserves original 2-column schema, filter still applied
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 2.2 (external ADD COLUMN)
+  test("temp view with stored plan preserves schema after external ADD 
COLUMN") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external schema change via catalog API
+      val addCol = TableChange.addColumn(Array("new_column"), IntegerType, 
true)
+      catalog("testcat").alterTable(ident, addCol)
+
+      // external writer adds data with new schema
+      externalAppend(
+        catalogName = "testcat",
+        ident = ident,
+        row = InternalRow(2, 200, -1))
+
+      // view preserves original 2-column schema, filter still applied
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
+  test("connector w/ cache: temp view stale after external ADD COLUMN") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external schema change + data via catalog API
+      val addCol = TableChange.addColumn(Array("new_column"), IntegerType, 
true)
+      catalog("cachingcat").alterTable(ident, addCol)
+
+      externalAppend(
+        catalogName = "cachingcat",
+        ident = ident,
+        row = InternalRow(2, 200, -1))
+
+      // Caching connector returns stale table: external changes invisible
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, view preserves 
original 2-column schema
+      sql(s"REFRESH TABLE $t")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 3.1 (session column removal)
+  test("temp view with stored plan detects session column removal") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // session schema change via SQL
+      sql(s"ALTER TABLE $t DROP COLUMN salary")
+
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` INT has been removed"))
+    }
+  }
+
+  // Scenario 3.2 (external column removal)
+  test("temp view with stored plan detects external column removal") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external schema change via catalog API
+      val dropCol = TableChange.deleteColumn(Array("salary"), false)
+      catalog("testcat").alterTable(ident, dropCol)
+
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` INT has been removed"))
+    }
+  }
+
+  // Scenario 3.2 connector w/ cache (external column removal, caching 
connector)
+  test("connector w/ cache: temp view stale after external column removal") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external column removal via catalog API
+      val dropCol = TableChange.deleteColumn(Array("salary"), false)
+      catalog("cachingcat").alterTable(ident, dropCol)
+
+      // Caching connector returns stale table: column removal invisible, no 
error
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, column removal detected
+      sql(s"REFRESH TABLE $t")
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` INT has been removed"))
+    }
+  }
+
+  // Scenario 4.1 (session drop and recreate table)
+  test("temp view with stored plan resolves to session-recreated table") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      val originalTableId = catalog("testcat").loadTable(ident).id
+
+      // session drop and recreate via SQL
+      sql(s"DROP TABLE $t")
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+
+      val newTableId = catalog("testcat").loadTable(ident).id
+      assert(originalTableId != newTableId)
+
+      // view resolves to the new empty table
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // insert new data and verify the view picks it up
+      sql(s"INSERT INTO $t VALUES (2, 200)")
+      checkAnswer(spark.table("v"), Seq(Row(2, 200)))
+    }
+  }
+
+  // Scenario 4.2 (external drop and recreate table)
+  test("temp view with stored plan resolves to externally recreated table") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      val originalTableId = catalog("testcat").loadTable(ident).id
+
+      // external drop and recreate via catalog API
+      catalog("testcat").dropTable(ident)
+      catalog("testcat").createTable(
+        ident,
+        new TableInfo.Builder()
+          .withColumns(Array(
+            Column.create("id", IntegerType),
+            Column.create("salary", IntegerType)))
+          .build())
+
+      val newTableId = catalog("testcat").loadTable(ident).id
+      assert(originalTableId != newTableId)
+
+      // view resolves to the new empty table
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // insert new data and verify the view picks it up
+      sql(s"INSERT INTO $t VALUES (2, 200)")
+      checkAnswer(spark.table("v"), Seq(Row(2, 200)))
+    }
+  }
+
+  // Scenario 4.2 connector w/ cache (external drop/recreate, caching 
connector)
+  test("connector w/ cache: temp view stale after external drop/recreate") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external drop and recreate via catalog API
+      catalog("cachingcat").dropTable(ident)
+      catalog("cachingcat").createTable(
+        ident,
+        new TableInfo.Builder()
+          .withColumns(Array(
+            Column.create("id", IntegerType),
+            Column.create("salary", IntegerType)))
+          .build())
+
+      // Caching connector returns stale table: drop/recreate invisible
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, view resolves to new 
empty table
+      sql(s"REFRESH TABLE $t")
+      checkAnswer(spark.table("v"), Seq.empty)
+    }
+  }
+
+  // Scenario 5.1 (session drop and re-add column with same type, multiple 
views)
+  test("temp view with stored plan after session drop and re-add column same 
type" +
+      " with unfiltered view") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      spark.table(t).createOrReplaceTempView("v_no_filter")
+      spark.table(t).filter("salary IS 
NULL").createOrReplaceTempView("v_filter_is_null")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+      checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000)))
+      checkAnswer(spark.table("v_filter_is_null"), Seq.empty)
+
+      // drop and re-add column with same name and type
+      sql(s"ALTER TABLE $t DROP COLUMN salary")
+      sql(s"ALTER TABLE $t ADD COLUMN salary INT")
+
+      // salary values are now null, so the filtered view returns nothing
+      checkAnswer(spark.table("v"), Seq.empty)
+      // unfiltered view returns rows with null salary
+      checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null)))
+      // IS NULL filter now matches all rows
+      checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, 
null)))
+    }
+  }
+
+  // Scenario 5.2 (external drop and re-add column with same type)
+  test("temp view with stored plan after external drop and re-add column same 
type") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      spark.table(t).createOrReplaceTempView("v_no_filter")
+      spark.table(t).filter("salary IS 
NULL").createOrReplaceTempView("v_filter_is_null")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+      checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000)))
+      checkAnswer(spark.table("v_filter_is_null"), Seq.empty)
+
+      // external drop and re-add column via catalog API
+      val dropCol = TableChange.deleteColumn(Array("salary"), false)
+      val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
+      catalog("testcat").alterTable(ident, dropCol, addCol)
+
+      // salary values are now null, so the filtered view returns nothing
+      checkAnswer(spark.table("v"), Seq.empty)
+      // unfiltered view returns rows with null salary
+      checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null)))
+      // IS NULL filter now matches all rows
+      checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, 
null)))
+    }
+  }
+
+  // Scenario 5.2 connector w/ cache (external drop/re-add column, caching 
connector)
+  test("connector w/ cache: temp view stale after external drop/re-add column 
same type") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external drop and re-add column with same type via catalog API
+      val dropCol = TableChange.deleteColumn(Array("salary"), false)
+      val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
+      catalog("cachingcat").alterTable(ident, dropCol, addCol)
+
+      // Caching connector returns stale table: column drop/re-add invisible
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, salary values are null
+      sql(s"REFRESH TABLE $t")
+      checkAnswer(spark.table("v"), Seq.empty)
+    }
+  }
+
+  // Scenario 6.1 (session drop and re-add column with different type)
+  test("temp view with stored plan detects session column type change") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // drop and re-add column with same name but different type
+      sql(s"ALTER TABLE $t DROP COLUMN salary")
+      sql(s"ALTER TABLE $t ADD COLUMN salary STRING")
+
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` type has changed from INT to STRING"))
+    }
+  }
+
+  // Scenario 6.2 (external drop and re-add column with different type)
+  test("temp view with stored plan detects external column type change") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external drop and re-add column with different type via catalog API
+      val dropCol = TableChange.deleteColumn(Array("salary"), false)
+      val addCol = TableChange.addColumn(Array("salary"), StringType, true)
+      catalog("testcat").alterTable(ident, dropCol, addCol)
+
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` type has changed from INT to STRING"))
+    }
+  }
+
+  // Scenario 6.2 connector w/ cache (external column type change, caching 
connector)
+  test("connector w/ cache: temp view stale after external column type 
change") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external drop and re-add column with different type via catalog API
+      val dropCol = TableChange.deleteColumn(Array("salary"), false)
+      val addCol = TableChange.addColumn(Array("salary"), StringType, true)
+      catalog("cachingcat").alterTable(ident, dropCol, addCol)
+
+      // Caching connector returns stale table: type change invisible, no error
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, type change detected
+      sql(s"REFRESH TABLE $t")
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` type has changed from INT to STRING"))
+    }
+  }
+
+  // Scenario 7.1 (session type widening from INT to BIGINT)
+  test("temp view with stored plan detects session type widening") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // session type widening via SQL
+      sql(s"ALTER TABLE $t ALTER COLUMN salary TYPE LONG")
+
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` type has changed from INT to BIGINT"))
+    }
+  }
+
+  // Scenario 7.2 (external type widening from INT to BIGINT)
+  test("temp view with stored plan detects external type widening") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // widen salary type from INT to BIGINT via catalog API
+      val updateType = TableChange.updateColumnType(Array("salary"), LongType)
+      catalog("testcat").alterTable(ident, updateType)
+
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` type has changed from INT to BIGINT"))
+    }
+  }
+
+  // Scenario 7.2 connector w/ cache (external type widening, caching 
connector)
+  test("connector w/ cache: temp view stale after external type widening") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external type widening via catalog API
+      val updateType = TableChange.updateColumnType(Array("salary"), LongType)
+      catalog("cachingcat").alterTable(ident, updateType)
+
+      // Caching connector returns stale table: type change invisible, no error
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, type change detected
+      sql(s"REFRESH TABLE $t")
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` type has changed from INT to BIGINT"))
+    }
+  }
+
   test("cached DSv2 table DataFrame is refreshed and reused after insert") {
     val t = "testcat.ns1.ns2.tbl"
     withTable(t) {
@@ -3077,6 +3648,7 @@ class DataSourceV2DataFrameSuite
 
       // verify external changes are reflected correctly when table is queried
       assertNotCached(spark.table(t))
+      assert(spark.table(t).schema.fieldNames.toSeq == Seq("id", "value", 
"category"))
       checkAnswer(spark.table(t), Seq.empty)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to