This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 481bc58bddb6 [SPARK-46444][SQL] V2SessionCatalog#createTable should 
not load the table
481bc58bddb6 is described below

commit 481bc58bddb6b998386b320e61a1b9f0e73c4711
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Dec 26 15:17:30 2023 +0800

    [SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table
    
    It's a perf regression in CREATE TABLE if we switch to the v2 command 
framework, as `V2SessionCatalog#createTable` does an extra table lookup, which 
does not happen in v1. This PR fixes it by allowing `TableCatalog#createTable` 
to return null, and Spark will call `loadTable` to get the new table metadata 
in the case of CTAS. This PR also fixed `alterTable` in the same way.
    
    fix perf regression in v2. The perf of a single command may not matter, but 
in a cluster with many Spark applications, it's important to reduce the RPCs to 
the metastore.
    
    no
    
    existing tests
    
    No
    
    Closes #44377 from cloud-fan/create-table.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/connector/catalog/TableCatalog.java  |   8 +-
 .../datasources/v2/V2SessionCatalog.scala          |   6 +-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |   8 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  13 +-
 .../sql/connector/TestV2SessionCatalogBase.scala   |   5 +-
 .../datasources/v2/V2SessionCatalogSuite.scala     | 181 +++++++++++++--------
 6 files changed, 139 insertions(+), 82 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index 387477d0f191..d1951a7f7fbf 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -208,7 +208,9 @@ public interface TableCatalog extends CatalogPlugin {
    * @param columns the columns of the new table.
    * @param partitions transforms to use for partitioning data in the table
    * @param properties a string map of table properties
-   * @return metadata for the new table
+   * @return metadata for the new table. This can be null if getting the 
metadata for the new table
+   *         is expensive. Spark will call {@link #loadTable(Identifier)} if 
needed (e.g. CTAS).
+   *
    * @throws TableAlreadyExistsException If a table or view already exists for 
the identifier
    * @throws UnsupportedOperationException If a requested partition transform 
is not supported
    * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
@@ -242,7 +244,9 @@ public interface TableCatalog extends CatalogPlugin {
    *
    * @param ident a table identifier
    * @param changes changes to apply to the table
-   * @return updated metadata for the table
+   * @return updated metadata for the table. This can be null if getting the 
metadata for the
+   *         updated table is expensive. Spark always discard the returned 
table here.
+   *
    * @throws NoSuchTableException If the table doesn't exist or is a view
    * @throws IllegalArgumentException If any change is rejected by the 
implementation.
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index d7ab23cf08dd..a022a01455a0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -148,7 +148,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
         throw QueryCompilationErrors.tableAlreadyExistsError(ident)
     }
 
-    loadTable(ident)
+    null // Return null to save the `loadTable` call for CREATE TABLE without 
AS SELECT.
   }
 
   private def toOptions(properties: Map[String, String]): Map[String, String] 
= {
@@ -189,7 +189,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
         throw QueryCompilationErrors.noSuchTableError(ident)
     }
 
-    loadTable(ident)
+    null // Return null to save the `loadTable` call for ALTER TABLE.
   }
 
   override def purgeTable(ident: Identifier): Boolean = {
@@ -233,8 +233,6 @@ class V2SessionCatalog(catalog: SessionCatalog)
       throw QueryCompilationErrors.tableAlreadyExistsError(newIdent)
     }
 
-    // Load table to make sure the table exists
-    loadTable(oldIdent)
     catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 4a9b85450a17..89c879beda82 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
TableSpec, UnaryNode}
 import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, 
CharVarcharUtils, WriteDeltaProjections}
 import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, 
INSERT_OPERATION, UPDATE_OPERATION}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, 
TableWritePrivilege}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, 
WriterCommitMessage}
@@ -81,9 +81,10 @@ case class CreateTableAsSelectExec(
       }
       throw QueryCompilationErrors.tableAlreadyExistsError(ident)
     }
-    val table = catalog.createTable(
+    val table = Option(catalog.createTable(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
       partitioning.toArray, properties.asJava)
+    ).getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
     writeToTable(catalog, table, writeOptions, ident, query)
   }
 }
@@ -161,9 +162,10 @@ case class ReplaceTableAsSelectExec(
     } else if (!orCreate) {
       throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
     }
-    val table = catalog.createTable(
+    val table = Option(catalog.createTable(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
       partitioning.toArray, properties.asJava)
+    ).getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
     writeToTable(catalog, table, writeOptions, ident, query)
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 27a0b731021e..6ae182810b8e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -3486,19 +3486,19 @@ class DataSourceV2SQLSuiteV1Filter
 
         // Test CTAS
         withTable(tbl) {
-          // assertPrivilegeError(sql(s"CREATE TABLE $tbl AS SELECT 1 i"), 
"INSERT")
+          assertPrivilegeError(sql(s"CREATE TABLE $tbl AS SELECT 1 i"), 
"INSERT")
         }
         withTable(tbl) {
-          // assertPrivilegeError(sql(s"CREATE OR REPLACE TABLE $tbl AS SELECT 
1 i"), "INSERT")
+          assertPrivilegeError(sql(s"CREATE OR REPLACE TABLE $tbl AS SELECT 1 
i"), "INSERT")
         }
         withTable(tbl) {
-          // assertPrivilegeError(input.write.saveAsTable(tbl), "INSERT")
+          assertPrivilegeError(input.write.saveAsTable(tbl), "INSERT")
         }
         withTable(tbl) {
-          // assertPrivilegeError(input.writeTo(tbl).create(), "INSERT")
+          assertPrivilegeError(input.writeTo(tbl).create(), "INSERT")
         }
         withTable(tbl) {
-          // assertPrivilegeError(input.writeTo(tbl).createOrReplace(), 
"INSERT")
+          assertPrivilegeError(input.writeTo(tbl).createOrReplace(), "INSERT")
         }
       }
     }
@@ -3560,7 +3560,7 @@ class V2CatalogSupportBuiltinDataSource extends 
InMemoryCatalog {
       partitions: Array[Transform],
       properties: jutil.Map[String, String]): Table = {
     super.createTable(ident, columns, partitions, properties)
-    loadTable(ident)
+    null
   }
 
   override def loadTable(ident: Identifier): Table = {
@@ -3591,6 +3591,7 @@ class ReadOnlyCatalog extends InMemoryCatalog {
       partitions: Array[Transform],
       properties: jutil.Map[String, String]): Table = {
     super.createTable(ident, columns, partitions, properties)
+    null
   }
 
   override def loadTable(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
index 9042231bdc59..9144fb939045 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
@@ -78,6 +78,7 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] 
extends Delegating
       schema: StructType,
       partitions: Array[Transform],
       properties: java.util.Map[String, String]): Table = {
+    import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
     val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY
     val newProps = new java.util.HashMap[String, String]()
     newProps.putAll(properties)
@@ -96,8 +97,8 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] 
extends Delegating
     } else {
       newProps
     }
-    val created = super.createTable(ident, schema, partitions, 
propsWithLocation)
-    val t = newTable(created.name(), schema, partitions, propsWithLocation)
+    super.createTable(ident, schema, partitions, propsWithLocation)
+    val t = newTable(ident.quoted, schema, partitions, propsWithLocation)
     addTable(ident, t)
     t
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index 8f5996438e20..01033cd681b7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -125,7 +125,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
     assert(parsed == Seq("db", "test_table"))
@@ -143,7 +144,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val table = catalog.loadTable(testIdent)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
     assert(parsed == Seq("db", "test_table"))
@@ -158,7 +160,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
       .map(part => quoteIdentifier(part)).mkString(".")
@@ -185,26 +188,30 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     assert(!catalog.tableExists(testIdent))
 
     // default location
-    val t1 = catalog.createTable(testIdent, schema, emptyTrans, 
properties).asInstanceOf[V1Table]
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t1.catalogTable.location ===
       spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
     catalog.dropTable(testIdent)
 
     // relative path
     properties.put(TableCatalog.PROP_LOCATION, "relative/path")
-    val t2 = catalog.createTable(testIdent, schema, emptyTrans, 
properties).asInstanceOf[V1Table]
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t2.catalogTable.location === 
makeQualifiedPathWithWarehouse("db.db/relative/path"))
     catalog.dropTable(testIdent)
 
     // absolute path without scheme
     properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
-    val t3 = catalog.createTable(testIdent, schema, emptyTrans, 
properties).asInstanceOf[V1Table]
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t3.catalogTable.location.toString === "file:///absolute/path")
     catalog.dropTable(testIdent)
 
     // absolute path with scheme
     properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
-    val t4 = catalog.createTable(testIdent, schema, emptyTrans, 
properties).asInstanceOf[V1Table]
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t4.catalogTable.location.toString === "file:/absolute/path")
     catalog.dropTable(testIdent)
   }
@@ -226,12 +233,11 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("loadTable") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
     val loaded = catalog.loadTable(testIdent)
 
-    assert(table.name == loaded.name)
-    assert(table.schema == loaded.schema)
-    assert(table.properties == loaded.properties)
+    assert(loaded.name == testIdent.toString)
+    assert(loaded.schema == schema)
   }
 
   test("loadTable: table does not exist") {
@@ -247,7 +253,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("invalidateTable") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
     catalog.invalidateTable(testIdent)
 
     val loaded = catalog.loadTable(testIdent)
@@ -268,11 +275,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add property") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(filterV2TableProperties(table.properties) == Map())
 
-    val updated = catalog.alterTable(testIdent, 
TableChange.setProperty("prop-1", "1"))
+    catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1"))
+    val updated = catalog.loadTable(testIdent)
     assert(filterV2TableProperties(updated.properties) == Map("prop-1" -> "1"))
 
     val loaded = catalog.loadTable(testIdent)
@@ -287,11 +296,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val properties = new util.HashMap[String, String]()
     properties.put("prop-1", "1")
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val table = catalog.loadTable(testIdent)
 
     assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1"))
 
-    val updated = catalog.alterTable(testIdent, 
TableChange.setProperty("prop-2", "2"))
+    catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2"))
+    val updated = catalog.loadTable(testIdent)
     assert(filterV2TableProperties(updated.properties) == Map("prop-1" -> "1", 
"prop-2" -> "2"))
 
     val loaded = catalog.loadTable(testIdent)
@@ -306,11 +317,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val properties = new util.HashMap[String, String]()
     properties.put("prop-1", "1")
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
+    catalog.createTable(testIdent, schema, emptyTrans, properties)
+    val table = catalog.loadTable(testIdent)
 
     assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1"))
 
-    val updated = catalog.alterTable(testIdent, 
TableChange.removeProperty("prop-1"))
+    catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+    val updated = catalog.loadTable(testIdent)
     assert(filterV2TableProperties(updated.properties) == Map())
 
     val loaded = catalog.loadTable(testIdent)
@@ -322,11 +335,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: remove missing property") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(filterV2TableProperties(table.properties) == Map())
 
-    val updated = catalog.alterTable(testIdent, 
TableChange.removeProperty("prop-1"))
+    catalog.alterTable(testIdent, TableChange.removeProperty("prop-1"))
+    val updated = catalog.loadTable(testIdent)
     assert(filterV2TableProperties(updated.properties) == Map())
 
     val loaded = catalog.loadTable(testIdent)
@@ -338,11 +353,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent, 
TableChange.addColumn(Array("ts"), TimestampType))
+    catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), 
TimestampType))
+    val updated = catalog.loadTable(testIdent)
 
     assert(updated.schema == schema.add("ts", TimestampType))
   }
@@ -350,12 +367,14 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add required column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.addColumn(Array("ts"), TimestampType, false))
+    val updated = catalog.loadTable(testIdent)
 
     assert(updated.schema == schema.add("ts", TimestampType, nullable = false))
   }
@@ -363,12 +382,14 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add column with comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.addColumn(Array("ts"), TimestampType, false, "comment text"))
+    val updated = catalog.loadTable(testIdent)
 
     val field = StructField("ts", TimestampType, nullable = 
false).withComment("comment text")
     assert(updated.schema == schema.add(field))
@@ -380,12 +401,14 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.addColumn(Array("point", "z"), DoubleType))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType))
 
@@ -395,7 +418,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add column to primitive field fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -413,7 +437,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add field to missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -429,11 +454,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: update column data type") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent, 
TableChange.updateColumnType(Array("id"), LongType))
+    catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), 
LongType))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = new StructType().add("id", LongType).add("data", 
StringType)
     assert(updated.schema == expectedSchema)
@@ -445,12 +472,14 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val originalSchema = new StructType()
         .add("id", IntegerType, nullable = false)
         .add("data", StringType)
-    val table = catalog.createTable(testIdent, originalSchema, emptyTrans, 
emptyProps)
+    catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == originalSchema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.updateColumnNullability(Array("id"), true))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = new StructType().add("id", IntegerType).add("data", 
StringType)
     assert(updated.schema == expectedSchema)
@@ -459,7 +488,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: update missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -475,12 +505,14 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.updateColumnComment(Array("id"), "comment text"))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = new StructType()
         .add("id", IntegerType, nullable = true, "comment text")
@@ -491,7 +523,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: replace comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -501,8 +534,9 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
         .add("id", IntegerType, nullable = true, "replacement comment")
         .add("data", StringType)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.updateColumnComment(Array("id"), "replacement comment"))
+    val updated = catalog.loadTable(testIdent)
 
     assert(updated.schema == expectedSchema)
   }
@@ -510,7 +544,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add comment to missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -526,11 +561,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: rename top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent, 
TableChange.renameColumn(Array("id"), "some_id"))
+    catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), 
"some_id"))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = new StructType().add("some_id", 
IntegerType).add("data", StringType)
 
@@ -543,12 +580,14 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.renameColumn(Array("point", "x"), "first"))
+    val updated = catalog.loadTable(testIdent)
 
     val newPointStruct = new StructType().add("first", DoubleType).add("y", 
DoubleType)
     val expectedSchema = schema.add("point", newPointStruct)
@@ -562,12 +601,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
-    val updated = catalog.alterTable(testIdent,
-      TableChange.renameColumn(Array("point"), "p"))
+    catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), 
"p"))
+    val updated = catalog.loadTable(testIdent)
 
     val newPointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val expectedSchema = schema.add("p", newPointStruct)
@@ -578,7 +618,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: rename missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -597,13 +638,15 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
-    val updated = catalog.alterTable(testIdent,
+    catalog.alterTable(testIdent,
       TableChange.renameColumn(Array("point", "x"), "first"),
       TableChange.renameColumn(Array("point", "y"), "second"))
+    val updated = catalog.loadTable(testIdent)
 
     val newPointStruct = new StructType().add("first", 
DoubleType).add("second", DoubleType)
     val expectedSchema = schema.add("point", newPointStruct)
@@ -614,12 +657,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: delete top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
-    val updated = catalog.alterTable(testIdent,
-      TableChange.deleteColumn(Array("id"), false))
+    catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false))
+    val updated = catalog.loadTable(testIdent)
 
     val expectedSchema = new StructType().add("data", StringType)
     assert(updated.schema == expectedSchema)
@@ -631,12 +675,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
-    val updated = catalog.alterTable(testIdent,
-      TableChange.deleteColumn(Array("point", "y"), false))
+    catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", 
"y"), false))
+    val updated = catalog.loadTable(testIdent)
 
     val newPointStruct = new StructType().add("x", DoubleType)
     val expectedSchema = schema.add("point", newPointStruct)
@@ -647,7 +692,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: delete missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == schema)
 
@@ -669,7 +715,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
+    catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps)
+    val table = catalog.loadTable(testIdent)
 
     assert(table.schema == tableSchema)
 
@@ -700,23 +747,27 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     assert(!catalog.tableExists(testIdent))
 
     // default location
-    val t1 = catalog.createTable(testIdent, schema, emptyTrans, 
emptyProps).asInstanceOf[V1Table]
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t1.catalogTable.location ===
       spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
 
     // relative path
-    val t2 = catalog.alterTable(testIdent,
-      TableChange.setProperty(TableCatalog.PROP_LOCATION, 
"relative/path")).asInstanceOf[V1Table]
+    catalog.alterTable(testIdent,
+      TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path"))
+    val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t2.catalogTable.location === 
makeQualifiedPathWithWarehouse("db.db/relative/path"))
 
     // absolute path without scheme
-    val t3 = catalog.alterTable(testIdent,
-      TableChange.setProperty(TableCatalog.PROP_LOCATION, 
"/absolute/path")).asInstanceOf[V1Table]
+    catalog.alterTable(testIdent,
+      TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path"))
+    val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t3.catalogTable.location.toString === "file:///absolute/path")
 
     // absolute path with scheme
-    val t4 = catalog.alterTable(testIdent, TableChange.setProperty(
-      TableCatalog.PROP_LOCATION, "file:/absolute/path")).asInstanceOf[V1Table]
+    catalog.alterTable(testIdent, TableChange.setProperty(
+      TableCatalog.PROP_LOCATION, "file:/absolute/path"))
+    val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table]
     assert(t4.catalogTable.location.toString === "file:/absolute/path")
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to