This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d8fb91e61352 [SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table d8fb91e61352 is described below commit d8fb91e61352e57e733e7d7e4978c8ce555454b1 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Dec 26 15:17:30 2023 +0800 [SPARK-46444][SQL] V2SessionCatalog#createTable should not load the table ### What changes were proposed in this pull request? It's a perf regression in CREATE TABLE if we switch to the v2 command framework, as `V2SessionCatalog#createTable` does an extra table lookup, which does not happen in v1. This PR fixes it by allowing `TableCatalog#createTable` to return null, and Spark will call `loadTable` to get the new table metadata in the case of CTAS. This PR also fixed `alterTable` in the same way. ### Why are the changes needed? fix perf regression in v2. The perf of a single command may not matter, but in a cluster with many Spark applications, it's important to reduce the RPCs to the metastore. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #44377 from cloud-fan/create-table. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../src/main/resources/error/error-classes.json | 4 +- docs/sql-error-conditions.md | 4 +- .../spark/sql/connector/catalog/TableCatalog.java | 8 +- .../spark/sql/errors/QueryCompilationErrors.scala | 6 +- .../datasources/v2/V2SessionCatalog.scala | 37 ++--- .../datasources/v2/WriteToDataSourceV2Exec.scala | 8 +- .../spark/sql/connector/DataSourceV2Suite.scala | 12 +- .../datasources/v2/V2SessionCatalogSuite.scala | 181 +++++++++++++-------- 8 files changed, 157 insertions(+), 103 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 8970045d4ab3..700b1ed07513 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -895,7 +895,9 @@ }, "DATA_SOURCE_TABLE_SCHEMA_MISMATCH" : { "message" : [ - "The schema of the data source table <tableSchema> does not match the actual schema <actualSchema>. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema." + "The schema of the data source table does not match the expected schema. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema.", + "Data Source schema: <dsSchema>", + "Expected schema: <expectedSchema>" ], "sqlState" : "42K03" }, diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 0722cae5815e..a8d2b6c894bc 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -496,7 +496,9 @@ Failed to find the data source: `<provider>`. Please find packages at `https://s [SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) -The schema of the data source table `<tableSchema>` does not match the actual schema `<actualSchema>`. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema. +The schema of the data source table does not match the expected schema. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema. +Data Source schema: `<dsSchema>` +Expected schema: `<expectedSchema>` ### DATETIME_OVERFLOW diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 6642adc33548..74700789dde0 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -187,7 +187,9 @@ public interface TableCatalog extends CatalogPlugin { * @param columns the columns of the new table. * @param partitions transforms to use for partitioning data in the table * @param properties a string map of table properties - * @return metadata for the new table + * @return metadata for the new table. This can be null if getting the metadata for the new table + * is expensive. Spark will call {@link #loadTable(Identifier)} if needed (e.g. CTAS). + * * @throws TableAlreadyExistsException If a table or view already exists for the identifier * @throws UnsupportedOperationException If a requested partition transform is not supported * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) @@ -221,7 +223,9 @@ public interface TableCatalog extends CatalogPlugin { * * @param ident a table identifier * @param changes changes to apply to the table - * @return updated metadata for the table + * @return updated metadata for the table. This can be null if getting the metadata for the + * updated table is expensive. Spark always discard the returned table here. + * * @throws NoSuchTableException If the table doesn't exist or is a view * @throws IllegalArgumentException If any change is rejected by the implementation. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index bb2b7e7ae066..ee41cbe2f50e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3926,11 +3926,11 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat } def dataSourceTableSchemaMismatchError( - tableSchema: StructType, actualSchema: StructType): Throwable = { + dsSchema: StructType, expectedSchema: StructType): Throwable = { new AnalysisException( errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", messageParameters = Map( - "tableSchema" -> toSQLType(tableSchema), - "actualSchema" -> toSQLType(actualSchema))) + "dsSchema" -> toSQLType(dsSchema), + "expectedSchema" -> toSQLType(expectedSchema))) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index a7694f5d829d..e7445e970fa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -198,10 +198,21 @@ class V2SessionCatalog(catalog: SessionCatalog) s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}") // Infer the schema and partitions and store them in the catalog. (tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions)) - } else if (partitions.isEmpty) { - (schema, tableProvider.inferPartitioning(dsOptions)) } else { - (schema, partitions) + val partitioning = if (partitions.isEmpty) { + tableProvider.inferPartitioning(dsOptions) + } else { + partitions + } + val table = tableProvider.getTable(schema, partitions, dsOptions) + // Check if the schema of the created table matches the given schema. + val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema( + table.columns().asSchema) + if (!DataType.equalsIgnoreNullability(tableSchema, schema)) { + throw QueryCompilationErrors.dataSourceTableSchemaMismatchError( + tableSchema, schema) + } + (schema, partitioning) } case _ => @@ -233,21 +244,7 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - val table = loadTable(ident) - - // Check if the schema of the created table matches the given schema. - // TODO: move this check in loadTable to match the behavior with - // existing file data sources. - if (schema.nonEmpty) { - val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema( - table.columns().asSchema) - if (!DataType.equalsIgnoreNullability(tableSchema, schema)) { - throw QueryCompilationErrors.dataSourceTableSchemaMismatchError( - table.columns().asSchema, schema) - } - } - - table + null // Return null to save the `loadTable` call for CREATE TABLE without AS SELECT. } private def toOptions(properties: Map[String, String]): Map[String, String] = { @@ -288,7 +285,7 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.noSuchTableError(ident) } - loadTable(ident) + null // Return null to save the `loadTable` call for ALTER TABLE. } override def purgeTable(ident: Identifier): Boolean = { @@ -332,8 +329,6 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.tableAlreadyExistsError(newIdent) } - // Load table to make sure the table exists - loadTable(oldIdent) catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 97c1f7ced508..c65c15fb0ef2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -82,9 +82,9 @@ case class CreateTableAsSelectExec( } throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - val table = catalog.createTable( + val table = Option(catalog.createTable( ident, getV2Columns(query.schema, catalog.useNullableQuerySchema), - partitioning.toArray, properties.asJava) + partitioning.toArray, properties.asJava)).getOrElse(catalog.loadTable(ident)) writeToTable(catalog, table, writeOptions, ident, query) } } @@ -162,9 +162,9 @@ case class ReplaceTableAsSelectExec( } else if (!orCreate) { throw QueryCompilationErrors.cannotReplaceMissingTableError(ident) } - val table = catalog.createTable( + val table = Option(catalog.createTable( ident, getV2Columns(query.schema, catalog.useNullableQuerySchema), - partitioning.toArray, properties.asJava) + partitioning.toArray, properties.asJava)).getOrElse(catalog.loadTable(ident)) writeToTable(catalog, table, writeOptions, ident, query) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 3f3dc82da5ad..ea263b36c76c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -734,8 +734,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS }, errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", parameters = Map( - "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"", - "actualSchema" -> "\"STRUCT<x: INT, y: INT>\"")) + "dsSchema" -> "\"STRUCT<i: INT, j: INT>\"", + "expectedSchema" -> "\"STRUCT<x: INT, y: INT>\"")) } } @@ -772,8 +772,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS }, errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", parameters = Map( - "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"", - "actualSchema" -> "\"STRUCT<col1: INT, col2: INT>\"")) + "dsSchema" -> "\"STRUCT<i: INT, j: INT>\"", + "expectedSchema" -> "\"STRUCT<col1: INT, col2: INT>\"")) } } @@ -790,8 +790,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS }, errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", parameters = Map( - "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"", - "actualSchema" -> "\"STRUCT<i: STRING, j: STRING>\"")) + "dsSchema" -> "\"STRUCT<i: INT, j: INT>\"", + "expectedSchema" -> "\"STRUCT<i: STRING, j: STRING>\"")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index f9da55ed6ba3..e5473222d429 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -125,7 +125,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("db", "test_table")) @@ -143,7 +144,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, schema, emptyTrans, properties) + val table = catalog.loadTable(testIdent) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("db", "test_table")) @@ -158,7 +160,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) .map(part => quoteIdentifier(part)).mkString(".") @@ -185,26 +188,30 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) // default location - val t1 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] + catalog.createTable(testIdent, schema, emptyTrans, properties) + val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t1.catalogTable.location === spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier)) catalog.dropTable(testIdent) // relative path properties.put(TableCatalog.PROP_LOCATION, "relative/path") - val t2 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] + catalog.createTable(testIdent, schema, emptyTrans, properties) + val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path")) catalog.dropTable(testIdent) // absolute path without scheme properties.put(TableCatalog.PROP_LOCATION, "/absolute/path") - val t3 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] + catalog.createTable(testIdent, schema, emptyTrans, properties) + val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t3.catalogTable.location.toString === "file:///absolute/path") catalog.dropTable(testIdent) // absolute path with scheme properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path") - val t4 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] + catalog.createTable(testIdent, schema, emptyTrans, properties) + val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t4.catalogTable.location.toString === "file:/absolute/path") catalog.dropTable(testIdent) } @@ -226,12 +233,11 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("loadTable") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val loaded = catalog.loadTable(testIdent) - assert(table.name == loaded.name) - assert(table.schema == loaded.schema) - assert(table.properties == loaded.properties) + assert(loaded.name == testIdent.toString) + assert(loaded.schema == schema) } test("loadTable: table does not exist") { @@ -247,7 +253,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("invalidateTable") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) catalog.invalidateTable(testIdent) val loaded = catalog.loadTable(testIdent) @@ -268,11 +275,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add property") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map()) - val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1")) + catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1")) + val updated = catalog.loadTable(testIdent) assert(filterV2TableProperties(updated.properties) == Map("prop-1" -> "1")) val loaded = catalog.loadTable(testIdent) @@ -287,11 +296,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - val table = catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, schema, emptyTrans, properties) + val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1")) - val updated = catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2")) + catalog.alterTable(testIdent, TableChange.setProperty("prop-2", "2")) + val updated = catalog.loadTable(testIdent) assert(filterV2TableProperties(updated.properties) == Map("prop-1" -> "1", "prop-2" -> "2")) val loaded = catalog.loadTable(testIdent) @@ -306,11 +317,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - val table = catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, schema, emptyTrans, properties) + val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1")) - val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1")) + catalog.alterTable(testIdent, TableChange.removeProperty("prop-1")) + val updated = catalog.loadTable(testIdent) assert(filterV2TableProperties(updated.properties) == Map()) val loaded = catalog.loadTable(testIdent) @@ -322,11 +335,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: remove missing property") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map()) - val updated = catalog.alterTable(testIdent, TableChange.removeProperty("prop-1")) + catalog.alterTable(testIdent, TableChange.removeProperty("prop-1")) + val updated = catalog.loadTable(testIdent) assert(filterV2TableProperties(updated.properties) == Map()) val loaded = catalog.loadTable(testIdent) @@ -338,11 +353,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType)) + catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType)) + val updated = catalog.loadTable(testIdent) assert(updated.schema == schema.add("ts", TimestampType)) } @@ -350,12 +367,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add required column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType, false)) + val updated = catalog.loadTable(testIdent) assert(updated.schema == schema.add("ts", TimestampType, nullable = false)) } @@ -363,12 +382,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add column with comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType, false, "comment text")) + val updated = catalog.loadTable(testIdent) val field = StructField("ts", TimestampType, nullable = false).withComment("comment text") assert(updated.schema == schema.add(field)) @@ -380,12 +401,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.addColumn(Array("point", "z"), DoubleType)) + val updated = catalog.loadTable(testIdent) val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType)) @@ -395,7 +418,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add column to primitive field fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -413,7 +437,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add field to missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -429,11 +454,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: update column data type") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType)) + catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType)) + val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType().add("id", LongType).add("data", StringType) assert(updated.schema == expectedSchema) @@ -445,12 +472,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val originalSchema = new StructType() .add("id", IntegerType, nullable = false) .add("data", StringType) - val table = catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == originalSchema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.updateColumnNullability(Array("id"), true)) + val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType().add("id", IntegerType).add("data", StringType) assert(updated.schema == expectedSchema) @@ -459,7 +488,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: update missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -475,12 +505,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) + val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType() .add("id", IntegerType, nullable = true, "comment text") @@ -491,7 +523,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: replace comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -501,8 +534,9 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { .add("id", IntegerType, nullable = true, "replacement comment") .add("data", StringType) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "replacement comment")) + val updated = catalog.loadTable(testIdent) assert(updated.schema == expectedSchema) } @@ -510,7 +544,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add comment to missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -526,11 +561,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: rename top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) + catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) + val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType) @@ -543,12 +580,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.renameColumn(Array("point", "x"), "first")) + val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("first", DoubleType).add("y", DoubleType) val expectedSchema = schema.add("point", newPointStruct) @@ -562,12 +601,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) - val updated = catalog.alterTable(testIdent, - TableChange.renameColumn(Array("point"), "p")) + catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p")) + val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val expectedSchema = schema.add("p", newPointStruct) @@ -578,7 +618,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: rename missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -597,13 +638,15 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) - val updated = catalog.alterTable(testIdent, + catalog.alterTable(testIdent, TableChange.renameColumn(Array("point", "x"), "first"), TableChange.renameColumn(Array("point", "y"), "second")) + val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("first", DoubleType).add("second", DoubleType) val expectedSchema = schema.add("point", newPointStruct) @@ -614,12 +657,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: delete top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) - val updated = catalog.alterTable(testIdent, - TableChange.deleteColumn(Array("id"), false)) + catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false)) + val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType().add("data", StringType) assert(updated.schema == expectedSchema) @@ -631,12 +675,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) - val updated = catalog.alterTable(testIdent, - TableChange.deleteColumn(Array("point", "y"), false)) + catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "y"), false)) + val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("x", DoubleType) val expectedSchema = schema.add("point", newPointStruct) @@ -647,7 +692,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: delete missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == schema) @@ -669,7 +715,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.loadTable(testIdent) assert(table.schema == tableSchema) @@ -700,23 +747,27 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) // default location - val t1 = catalog.createTable(testIdent, schema, emptyTrans, emptyProps).asInstanceOf[V1Table] + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t1.catalogTable.location === spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier)) // relative path - val t2 = catalog.alterTable(testIdent, - TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")).asInstanceOf[V1Table] + catalog.alterTable(testIdent, + TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")) + val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path")) // absolute path without scheme - val t3 = catalog.alterTable(testIdent, - TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")).asInstanceOf[V1Table] + catalog.alterTable(testIdent, + TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")) + val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t3.catalogTable.location.toString === "file:///absolute/path") // absolute path with scheme - val t4 = catalog.alterTable(testIdent, TableChange.setProperty( - TableCatalog.PROP_LOCATION, "file:/absolute/path")).asInstanceOf[V1Table] + catalog.alterTable(testIdent, TableChange.setProperty( + TableCatalog.PROP_LOCATION, "file:/absolute/path")) + val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t4.catalogTable.location.toString === "file:/absolute/path") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org