cloud-fan commented on code in PR #55540:
URL: https://github.com/apache/spark/pull/55540#discussion_r3266598523


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * An InMemoryTableCatalog that simulates a caching connector like
+ * Iceberg's CachingCatalog. On first loadTable, returns a fresh
+ * copy. On subsequent loads, returns the CACHED (stale) copy,
+ * making external changes invisible.
+ *
+ * Session writes go through the SQL path which modifies the
+ * original table and invalidates, but direct catalog API
+ * modifications are not visible until the cache is cleared.

Review Comment:
   The "and invalidates" half of this sentence is both grammatically incomplete 
(no object) and not actually exercised by any test in this PR — `INSERT` goes 
through the un-overridden write-variant `loadTable(ident, writePrivileges)` and 
never touches the cache. Only explicit `REFRESH TABLE` / `invalidateTable` 
clears the cache. Suggest tightening to describe what the code actually does, 
e.g.:
   
   > Session writes go through the write-variant `loadTable`, which is not 
cached, so they modify the underlying table directly. Cached `loadTable` 
results may still be stale until `clearCache()` or `REFRESH TABLE` (which 
invokes `invalidateTable`) is called.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * An InMemoryTableCatalog that simulates a caching connector like
+ * Iceberg's CachingCatalog. On first loadTable, returns a fresh
+ * copy. On subsequent loads, returns the CACHED (stale) copy,
+ * making external changes invisible.
+ *
+ * Session writes go through the SQL path which modifies the
+ * original table and invalidates, but direct catalog API
+ * modifications are not visible until the cache is cleared.
+ *
+ * Call [[CachingInMemoryTableCatalog.clearCache()]] to simulate
+ * cache expiration (like Iceberg's 30-second TTL).
+ */
+class CachingInMemoryTableCatalog extends InMemoryTableCatalog {
+  import CachingInMemoryTableCatalog._
+
+  override def loadTable(ident: Identifier): Table = {
+    cachedTables.computeIfAbsent(cacheKey(name, ident), _ => {
+      super.loadTable(ident)
+    })
+  }
+
+  override def invalidateTable(ident: Identifier): Unit = {
+    super.invalidateTable(ident)
+    cachedTables.remove(cacheKey(name, ident))
+  }
+
+  private def cacheKey(
+      catalog: String, ident: Identifier): String = {
+    s"$catalog.${ident.toString}"
+  }
+}
+
+object CachingInMemoryTableCatalog {
+  private val cachedTables =
+    new ConcurrentHashMap[String, Table]()
+
+  def clearCache(): Unit = cachedTables.clear()

Review Comment:
   The cache is a JVM-global static, which can leak across suites if any other 
suite ever uses this catalog in the same JVM. Within this PR the per-test 
`after { clearCache() ... }` works, but a per-instance cache would be safer and 
also lets you key directly by `Identifier`:
   
   ```scala
   class CachingInMemoryTableCatalog extends InMemoryTableCatalog {
     private val cachedTables = new ConcurrentHashMap[Identifier, Table]()
   
     override def loadTable(ident: Identifier): Table =
       cachedTables.computeIfAbsent(ident, _ => super.loadTable(ident))
   
     override def invalidateTable(ident: Identifier): Unit = {
       super.invalidateTable(ident)
       cachedTables.remove(ident)
     }
   
     def clearCache(): Unit = cachedTables.clear()
   }
   ```
   
   `DataSourceV2DataFrameSuite.after` would then call 
`catalog("cachingcat").asInstanceOf[CachingInMemoryTableCatalog].clearCache()` 
instead of the static.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala:
##########
@@ -179,6 +179,20 @@ class BasicInMemoryTableCatalog extends TableCatalog {
       throw new IllegalArgumentException(s"Cannot drop all fields")
     }
 
+    // Compute the intermediate schema with only column deletions applied.
+    // This is used for data migration so that dropped column values are 
physically removed,
+    // even when a column with the same name is re-added in the same ALTER 
call.
+    val deleteOnlyChanges = 
changes.filter(_.isInstanceOf[TableChange.DeleteColumn])
+    val schemaAfterDrops = if (deleteOnlyChanges.nonEmpty) {
+      CatalogV2Util.applySchemaChanges(
+        table.schema,
+        deleteOnlyChanges,
+        tableProvider = Some("in-memory"),
+        statementType = "ALTER TABLE")
+    } else {
+      schema
+    }

Review Comment:
   Two things on this block:
   
   1. **The PR description undersells this.** This is a behavior change in 
shared test infrastructure — any test that calls `catalog.alterTable(ident, 
drop, add)` in a single call now gets different data-migration semantics. Worth 
calling out explicitly in the PR description.
   2. **The `why` is non-obvious from the current comment.** 
`InMemoryBaseTable.alterTableWithData` decides which old-row fields to keep by 
matching names against its `newSchema` argument. Passing the *post-drop 
intermediate* schema (rather than the final schema with `salary` re-added) is 
what makes the old data actually drop out — otherwise the name match retains 
the old `salary` value into the new column. A one-line pointer at 
`alterTableWithData` would help future readers.
   
   Optional simplification: the recomputation via `applySchemaChanges` can be 
replaced with a direct filter, since we already have `table.schema` in hand:
   
   ```scala
   val deletedTopLevelNames = changes.collect {
     case d: TableChange.DeleteColumn if d.fieldNames.length == 1 => 
d.fieldNames.head
   }.toSet
   val schemaAfterDrops = if (deletedTopLevelNames.nonEmpty) {
     StructType(table.schema.fields.filterNot(f => 
deletedTopLevelNames(f.name)))
   } else {
     schema
   }
   ```
   
   (Current tests only drop top-level columns; if nested drops ever need this 
treatment too, the filter would need to be extended — but the current 
`applySchemaChanges` form would need the same care for nested cases since 
`alterTableWithData` only filters by top-level field name anyway.)



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -2976,6 +2997,584 @@ class DataSourceV2DataFrameSuite
     }
   }
 
+  // Temp views with stored plans: scenarios from the DSv2 table refresh tests.
+  // Each test creates a DSv2 table with initial data, builds a temp view with 
a filter
+  // (to demonstrate that the stored plan is non-trivial), and then verifies 
the view
+  // behavior after various table modifications (session or external).
+
+  /** Appends rows to a DSv2 table via the catalog API, bypassing the session. 
*/
+  private def externalAppend(
+      catalogName: String,
+      ident: Identifier,
+      schema: StructType,
+      row: InternalRow): Unit = {
+    val extTable = catalog(catalogName).loadTable(ident,
+      util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+    extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
+  }
+
+  // Scenario 1.1 (session write)
+  test("temp view with stored plan reflects session write") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      sql(s"INSERT INTO $t VALUES (2, 200)")
+
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 1.2 (external write)
+  test("temp view with stored plan reflects external write") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external writer adds (2, 200) via direct catalog API
+      externalAppend(
+        catalogName = "testcat",
+        ident = ident,
+        schema = StructType.fromDDL("id INT, salary INT"),
+        row = InternalRow(2, 200))
+
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 1.2 connector w/ cache (external write, caching connector)
+  test("connector w/ cache: temp view stale after external write") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external writer adds (2, 200) via catalog API (bypasses cache)
+      externalAppend(
+        catalogName = "cachingcat",
+        ident = ident,
+        schema = StructType.fromDDL("id INT, salary INT"),
+        row = InternalRow(2, 200))
+
+      // Caching connector returns stale table: external write invisible
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, external write becomes 
visible
+      sql(s"REFRESH TABLE $t")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 2.1 (session ADD COLUMN)
+  test("temp view with stored plan preserves schema after session ADD COLUMN") 
{
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      sql(s"ALTER TABLE $t ADD COLUMN new_column INT")
+      sql(s"INSERT INTO $t VALUES (2, 200, -1)")
+
+      // view preserves original 2-column schema, filter still applied
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 2.2 (external ADD COLUMN)
+  test("temp view with stored plan preserves schema after external ADD 
COLUMN") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external schema change via catalog API
+      val addCol = TableChange.addColumn(Array("new_column"), IntegerType, 
true)
+      catalog("testcat").alterTable(ident, addCol)
+
+      // external writer adds data with new schema
+      externalAppend(
+        catalogName = "testcat",
+        ident = ident,
+        schema = StructType.fromDDL("id INT, salary INT, new_column INT"),
+        row = InternalRow(2, 200, -1))
+
+      // view preserves original 2-column schema, filter still applied
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
+  test("connector w/ cache: temp view stale after external ADD COLUMN") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external schema change + data via catalog API
+      val addCol = TableChange.addColumn(Array("new_column"), IntegerType, 
true)
+      catalog("cachingcat").alterTable(ident, addCol)
+
+      externalAppend(
+        catalogName = "cachingcat",
+        ident = ident,
+        schema = StructType.fromDDL("id INT, salary INT, new_column INT"),
+        row = InternalRow(2, 200, -1))
+
+      // Caching connector returns stale table: external changes invisible
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, view preserves 
original 2-column schema
+      sql(s"REFRESH TABLE $t")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+    }
+  }
+
+  // Scenario 3.1 (session column removal)
+  test("temp view with stored plan detects session column removal") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // session schema change via SQL
+      sql(s"ALTER TABLE $t DROP COLUMN salary")
+
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` INT has been removed"))
+    }
+  }
+
+  // Scenario 3.2 (external column removal)
+  test("temp view with stored plan detects external column removal") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external schema change via catalog API
+      val dropCol = TableChange.deleteColumn(Array("salary"), false)
+      catalog("testcat").alterTable(ident, dropCol)
+
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` INT has been removed"))
+    }
+  }
+
+  // Scenario 3.2 connector w/ cache (external column removal, caching 
connector)
+  test("connector w/ cache: temp view stale after external column removal") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external column removal via catalog API
+      val dropCol = TableChange.deleteColumn(Array("salary"), false)
+      catalog("cachingcat").alterTable(ident, dropCol)
+
+      // Caching connector returns stale table: column removal invisible, no 
error
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, column removal detected
+      sql(s"REFRESH TABLE $t")
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "- `salary` INT has been removed"))
+    }
+  }
+
+  // Scenario 4.1 (session drop and recreate table)
+  test("temp view with stored plan resolves to session-recreated table") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      val originalTableId = catalog("testcat").loadTable(ident).id
+
+      // session drop and recreate via SQL
+      sql(s"DROP TABLE $t")
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+
+      val newTableId = catalog("testcat").loadTable(ident).id
+      assert(originalTableId != newTableId)
+
+      // view resolves to the new empty table
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // insert new data and verify the view picks it up
+      sql(s"INSERT INTO $t VALUES (2, 200)")
+      checkAnswer(spark.table("v"), Seq(Row(2, 200)))
+    }
+  }
+
+  // Scenario 4.2 (external drop and recreate table)
+  test("temp view with stored plan resolves to externally recreated table") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      val originalTableId = catalog("testcat").loadTable(ident).id
+
+      // external drop and recreate via catalog API
+      catalog("testcat").dropTable(ident)
+      catalog("testcat").createTable(
+        ident,
+        new TableInfo.Builder()
+          .withColumns(Array(
+            Column.create("id", IntegerType),
+            Column.create("salary", IntegerType)))
+          .build())
+
+      val newTableId = catalog("testcat").loadTable(ident).id
+      assert(originalTableId != newTableId)
+
+      // view resolves to the new empty table
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // insert new data and verify the view picks it up
+      sql(s"INSERT INTO $t VALUES (2, 200)")
+      checkAnswer(spark.table("v"), Seq(Row(2, 200)))
+    }
+  }
+
+  // Scenario 4.2 connector w/ cache (external drop/recreate, caching 
connector)
+  test("connector w/ cache: temp view stale after external drop/recreate") {
+    val t = "cachingcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+      spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // external drop and recreate via catalog API
+      catalog("cachingcat").dropTable(ident)
+      catalog("cachingcat").createTable(
+        ident,
+        new TableInfo.Builder()
+          .withColumns(Array(
+            Column.create("id", IntegerType),
+            Column.create("salary", IntegerType)))
+          .build())
+
+      // Caching connector returns stale table: drop/recreate invisible
+      checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+      // REFRESH TABLE invalidates the connector cache, view resolves to new 
empty table
+      sql(s"REFRESH TABLE $t")
+      checkAnswer(spark.table("v"), Seq.empty)
+    }
+  }
+
+  // Scenario 5.1 (session drop and re-add column with same type, multiple 
views)
+  test("temp view with stored plan after session drop and re-add column same 
type" +
+      " with unfiltered view") {

Review Comment:
   This test is a strict superset of the existing `temp view with stored plan 
after session drop and re-add column same type` at line 2511 — same scenario, 
same `v` (`salary < 999`) and `v_filter_is_null` (`salary IS NULL`) views, just 
one extra `v_no_filter`. Either delete the existing test in favor of this one, 
or fold the `v_no_filter` assertion into the existing test and drop this new 
one — carrying both is redundant.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -2976,6 +2997,584 @@ class DataSourceV2DataFrameSuite
     }
   }
 
+  // Temp views with stored plans: scenarios from the DSv2 table refresh tests.
+  // Each test creates a DSv2 table with initial data, builds a temp view with 
a filter
+  // (to demonstrate that the stored plan is non-trivial), and then verifies 
the view
+  // behavior after various table modifications (session or external).
+
+  /** Appends rows to a DSv2 table via the catalog API, bypassing the session. 
*/
+  private def externalAppend(
+      catalogName: String,
+      ident: Identifier,
+      schema: StructType,
+      row: InternalRow): Unit = {
+    val extTable = catalog(catalogName).loadTable(ident,
+      util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+    extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
+  }

Review Comment:
   The `schema` parameter is currently derived by hand at every call site (e.g. 
`StructType.fromDDL("id INT, salary INT, new_column INT")`). It can be read off 
the table after `loadTable`, which removes a class of "schema parameter 
desynced from table state after `alterTable`" bugs:
   
   ```scala
   private def externalAppend(
       catalogName: String,
       ident: Identifier,
       row: InternalRow): Unit = {
     val extTable = catalog(catalogName).loadTable(ident,
       util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
     val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns())
     extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
   }
   ```
   
   The row layout must still match the current table column order, so worth a 
one-line comment to that effect. Current callers are correct, but the API as 
written encourages keeping two schema copies in sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to