This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 481bc58bddb6 [SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table 481bc58bddb6 is described below commit 481bc58bddb6b998386b320e61a1b9f0e73c4711 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Dec 26 15:17:30 2023 +0800 [SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table It's a perf regression in CREATE TABLE if we switch to the v2 command framework, as `V2SessionCatalog#createTable` does an extra table lookup, which does not happen in v1. This PR fixes it by allowing `TableCatalog#createTable` to return null, and Spark will call `loadTable` to get the new table metadata in the case of CTAS. This PR also fixed `alterTable` in the same way. fix perf regression in v2. The perf of a single command may not matter, but in a cluster with many Spark applications, it's important to reduce the RPCs to the metastore. no existing tests No Closes #44377 from cloud-fan/create-table. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/connector/catalog/TableCatalog.java | 8 +- .../datasources/v2/V2SessionCatalog.scala | 6 +- .../datasources/v2/WriteToDataSourceV2Exec.scala | 8 +- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 13 +- .../sql/connector/TestV2SessionCatalogBase.scala | 5 +- .../datasources/v2/V2SessionCatalogSuite.scala | 181 +++++++++++++-------- 6 files changed, 139 insertions(+), 82 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 387477d0f191..d1951a7f7fbf 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -208,7 +208,9 @@ public interface TableCatalog extends CatalogPlugin { * @param columns the columns of the new table. * @param partitions transforms to use for partitioning data in the table * @param properties a string map of table properties - * @return metadata for the new table + * @return metadata for the new table. This can be null if getting the metadata for the new table + * is expensive. Spark will call {@link #loadTable(Identifier)} if needed (e.g. CTAS). + * * @throws TableAlreadyExistsException If a table or view already exists for the identifier * @throws UnsupportedOperationException If a requested partition transform is not supported * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) @@ -242,7 +244,9 @@ public interface TableCatalog extends CatalogPlugin { * * @param ident a table identifier * @param changes changes to apply to the table - * @return updated metadata for the table + * @return updated metadata for the table. This can be null if getting the metadata for the + * updated table is expensive. Spark always discard the returned table here. + * * @throws NoSuchTableException If the table doesn't exist or is a view * @throws IllegalArgumentException If any change is rejected by the implementation. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index d7ab23cf08dd..a022a01455a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -148,7 +148,7 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - loadTable(ident) + null // Return null to save the `loadTable` call for CREATE TABLE without AS SELECT. } private def toOptions(properties: Map[String, String]): Map[String, String] = { @@ -189,7 +189,7 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.noSuchTableError(ident) } - loadTable(ident) + null // Return null to save the `loadTable` call for ALTER TABLE. } override def purgeTable(ident: Identifier): Boolean = { @@ -233,8 +233,6 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.tableAlreadyExistsError(newIdent) } - // Load table to make sure the table exists - loadTable(oldIdent) catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 4a9b85450a17..89c879beda82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, TableSpec, UnaryNode} import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, WriteDeltaProjections} import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, UPDATE_OPERATION} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage} @@ -81,9 +81,10 @@ case class CreateTableAsSelectExec( } throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - val table = catalog.createTable( + val table = Option(catalog.createTable( ident, getV2Columns(query.schema, catalog.useNullableQuerySchema), partitioning.toArray, properties.asJava) + ).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) writeToTable(catalog, table, writeOptions, ident, query) } } @@ -161,9 +162,10 @@ case class ReplaceTableAsSelectExec( } else if (!orCreate) { throw QueryCompilationErrors.cannotReplaceMissingTableError(ident) } - val table = catalog.createTable( + val table = Option(catalog.createTable( ident, getV2Columns(query.schema, catalog.useNullableQuerySchema), partitioning.toArray, properties.asJava) + ).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) writeToTable(catalog, table, writeOptions, ident, query) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 27a0b731021e..6ae182810b8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -3486,19 +3486,19 @@ class DataSourceV2SQLSuiteV1Filter // Test CTAS withTable(tbl) { - // assertPrivilegeError(sql(s"CREATE TABLE $tbl AS SELECT 1 i"), "INSERT") + assertPrivilegeError(sql(s"CREATE TABLE $tbl AS SELECT 1 i"), "INSERT") } withTable(tbl) { - // assertPrivilegeError(sql(s"CREATE OR REPLACE TABLE $tbl AS SELECT 1 i"), "INSERT") + assertPrivilegeError(sql(s"CREATE OR REPLACE TABLE $tbl AS SELECT 1 i"), "INSERT") } withTable(tbl) { - // assertPrivilegeError(input.write.saveAsTable(tbl), "INSERT") + assertPrivilegeError(input.write.saveAsTable(tbl), "INSERT") } withTable(tbl) { - // assertPrivilegeError(input.writeTo(tbl).create(), "INSERT") + assertPrivilegeError(input.writeTo(tbl).create(), "INSERT") } withTable(tbl) { - // assertPrivilegeError(input.writeTo(tbl).createOrReplace(), "INSERT") + assertPrivilegeError(input.writeTo(tbl).createOrReplace(), "INSERT") } } } @@ -3560,7 +3560,7 @@ class V2CatalogSupportBuiltinDataSource extends InMemoryCatalog { partitions: Array[Transform], properties: jutil.Map[String, String]): Table = { super.createTable(ident, columns, partitions, properties) - loadTable(ident) + null } override def loadTable(ident: Identifier): Table = { @@ -3591,6 +3591,7 @@ class ReadOnlyCatalog extends InMemoryCatalog { partitions: Array[Transform], properties: jutil.Map[String, String]): Table = { super.createTable(ident, columns, partitions, properties) + null } override def loadTable( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 9042231bdc59..9144fb939045 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -78,6 +78,7 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating schema: StructType, partitions: Array[Transform], properties: java.util.Map[String, String]): Table = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY val newProps = new java.util.HashMap[String, String]() newProps.putAll(properties) @@ -96,8 +97,8 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating } else { newProps } - val created = super.createTable(ident, schema, partitions, propsWithLocation) - val t = newTable(created.name(), schema, partitions, propsWithLocation) + super.createTable(ident, schema, partitions, propsWithLocation) + val t = newTable(ident.quoted, schema, partitions, propsWithLocation) addTable(ident, t) t } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 8f5996438e20..01033cd681b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -125,7 +125,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("db", "test_table")) @@ -143,7 +144,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, schema, emptyTrans, properties) + val table = catalog.loadTable(testIdent) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("db", "test_table")) @@ -158,7 +160,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) .map(part => quoteIdentifier(part)).mkString(".") @@ -185,26 +188,30 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) // default location - val t1 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] + catalog.createTable(testIdent, schema, emptyTrans, properties) + val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t1.catalogTable.location === spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier)) catalog.dropTable(testIdent) // relative path properties.put(TableCatalog.PROP_LOCATION, "relative/path") - val t2 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] + catalog.createTable(testIdent, schema, emptyTrans, properties) + val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path")) catalog.dropTable(testIdent) // absolute path without scheme properties.put(TableCatalog.PROP_LOCATION, "/absolute/path") - val t3 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] + catalog.createTable(testIdent, schema, emptyTrans, properties) + val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t3.catalogTable.location.toString === "file:///absolute/path") catalog.dropTable(testIdent) // absolute path with scheme properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path") - val t4 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] + catalog.createTable(testIdent, schema, emptyTrans, properties) + val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t4.catalogTable.location.toString === "file:/absolute/path") catalog.dropTable(testIdent) } @@ -226,12 +233,11 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("loadTable") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val loaded = catalog.loadTable(testIdent) - assert(table.name == loaded.name) - assert(table.schema == loaded.schema) - assert(table.properties == loaded.properties) + assert(loaded.name == testIdent.toString) + assert(loaded.schema == schema) } test("loadTable: table does not exist") { @@ -247,7 +253,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("invalidateTable") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) catalog.invalidateTable(testIdent) val loaded = catalog.loadTable(testIdent) @@ -268,11 +275,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add property") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map()) - val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1")) + catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1")) + val updated = catalog.loadTable(testIdent) assert(filterV2TableProperties(updated.properties) == Map("prop-1" -> "1")) val loaded = catalog.loadTable(testIdent) @@ -287,11 +296,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - val table = catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, schema, emptyTrans, properties) + val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1")) - val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2")) + catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2")) + val updated = catalog.loadTable(testIdent) assert(filterV2TableProperties(updated.properties) == Map("prop-1" -> "1", "prop-2" -> "2")) val loaded = catalog.loadTable(testIdent) @@ -306,11 +317,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - val table = catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, schema, emptyTrans, properties) + val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1")) - val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1")) + catalog.alterTable(testIdent, TableChange.removeProperty("prop-1")) + val updated = catalog.loadTable(testIdent) assert(filterV2TableProperties(updated.properties) == Map()) val loaded = catalog.loadTable(testIdent) @@ -322,11 +335,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: remove missing property") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map()) - val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1")) + catalog.alterTable(testIdent, TableChange.removeProperty("prop-1")) + val updated = catalog.loadTable(testIdent) assert(filterV2TableProperties(updated.properties) == Map()) val loaded = catalog.loadTable(testIdent) @@ -338,11 +353,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType)) + catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType)) + val updated = catalog.loadTable(testIdent) assert(updated.schema == schema.add("ts", TimestampType)) } @@ -350,12 +367,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add required column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType, false)) + val updated = catalog.loadTable(testIdent) assert(updated.schema == schema.add("ts", TimestampType, nullable = false)) } @@ -363,12 +382,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add column with comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType, false, "comment text")) + val updated = catalog.loadTable(testIdent) val field = StructField("ts", TimestampType, nullable = false).withComment("comment text") assert(updated.schema == schema.add(field)) @@ -380,12 +401,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.addColumn(Array("point", "z"), DoubleType)) + val updated = catalog.loadTable(testIdent) val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType)) @@ -395,7 +418,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add column to primitive field fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -413,7 +437,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add field to missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -429,11 +454,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: update column data type") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType)) + catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType)) + val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType().add("id", LongType).add("data", StringType) assert(updated.schema == expectedSchema) @@ -445,12 +472,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val originalSchema = new StructType() .add("id", IntegerType, nullable = false) .add("data", StringType) - val table = catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == originalSchema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.updateColumnNullability(Array("id"), true)) + val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType().add("id", IntegerType).add("data", StringType) assert(updated.schema == expectedSchema) @@ -459,7 +488,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: update missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -475,12 +505,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) + val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType() .add("id", IntegerType, nullable = true, "comment text") @@ -491,7 +523,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: replace comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -501,8 +534,9 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { .add("id", IntegerType, nullable = true, "replacement comment") .add("data", StringType) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "replacement comment")) + val updated = catalog.loadTable(testIdent) assert(updated.schema == expectedSchema) } @@ -510,7 +544,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add comment to missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -526,11 +561,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: rename top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) + catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) + val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType) @@ -543,12 +580,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.renameColumn(Array("point", "x"), "first")) + val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("first", DoubleType).add("y", DoubleType) val expectedSchema = schema.add("point", newPointStruct) @@ -562,12 +601,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) - val updated = catalog.alterTable(testIdent, - TableChange.renameColumn(Array("point"), "p")) + catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p")) + val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val expectedSchema = schema.add("p", newPointStruct) @@ -578,7 +618,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: rename missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -597,13 +638,15 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.renameColumn(Array("point", "x"), "first"), TableChange.renameColumn(Array("point", "y"), "second")) + val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("first", DoubleType).add("second", DoubleType) val expectedSchema = schema.add("point", newPointStruct) @@ -614,12 +657,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: delete top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, - TableChange.deleteColumn(Array("id"), false)) + catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false)) + val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType().add("data", StringType) assert(updated.schema == expectedSchema) @@ -631,12 +675,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) - val updated = catalog.alterTable(testIdent, - TableChange.deleteColumn(Array("point", "y"), false)) + catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "y"), false)) + val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("x", DoubleType) val expectedSchema = schema.add("point", newPointStruct) @@ -647,7 +692,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: delete missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -669,7 +715,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) @@ -700,23 +747,27 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) // default location - val t1 = catalog.createTable(testIdent, schema, emptyTrans, emptyProps).asInstanceOf[V1Table] + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t1.catalogTable.location === spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier)) // relative path - val t2 = catalog.alterTable(testIdent, - TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")).asInstanceOf[V1Table] + catalog.alterTable(testIdent, + TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")) + val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path")) // absolute path without scheme - val t3 = catalog.alterTable(testIdent, - TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")).asInstanceOf[V1Table] + catalog.alterTable(testIdent, + TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")) + val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t3.catalogTable.location.toString === "file:///absolute/path") // absolute path with scheme - val t4 = catalog.alterTable(testIdent, TableChange.setProperty( - TableCatalog.PROP_LOCATION, "file:/absolute/path")).asInstanceOf[V1Table] + catalog.alterTable(testIdent, TableChange.setProperty( + TableCatalog.PROP_LOCATION, "file:/absolute/path")) + val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t4.catalogTable.location.toString === "file:/absolute/path") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org