This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 688fa23 [SPARK-36902][SQL] Migrate CreateTableAsSelectStatement to v2 command 688fa23 is described below commit 688fa239265b981dc3acc69b79443905afe6a8cf Author: dch nguyen <dgd_contribu...@viettel.com.vn> AuthorDate: Fri Dec 3 21:08:41 2021 +0800 [SPARK-36902][SQL] Migrate CreateTableAsSelectStatement to v2 command ### What changes were proposed in this pull request? Migrate CreateTableAsSelectStatement to v2 command ### Why are the changes needed? Migrate CreateTableAsSelectStatement to v2 command ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #34667 from dchvn/migrate-CTAS. Authored-by: dch nguyen <dgd_contribu...@viettel.com.vn> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/ResolveCatalogs.scala | 12 ----- .../spark/sql/catalyst/parser/AstBuilder.scala | 12 ++--- .../sql/catalyst/plans/logical/statements.scala | 23 --------- .../sql/catalyst/plans/logical/v2Commands.scala | 23 ++++++--- .../sql/connector/catalog/CatalogV2Util.scala | 7 +-- .../CreateTablePartitioningValidationSuite.scala | 54 ++++++++++++---------- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 26 +++++------ .../org/apache/spark/sql/DataFrameWriter.scala | 48 +++++++++++-------- .../org/apache/spark/sql/DataFrameWriterV2.scala | 28 +++++------ .../catalyst/analysis/ResolveSessionCatalog.scala | 29 +++++------- .../datasources/v2/DataSourceV2Strategy.scala | 12 ++--- .../datasources/v2/WriteToDataSourceV2Exec.scala | 8 +++- .../connector/V2CommandsCaseSensitivitySuite.scala | 14 +++--- .../execution/command/PlanResolutionSuite.scala | 46 +++++------------- 14 files changed, 155 insertions(+), 187 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index d7c6301..3e21a60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -37,18 +37,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) => ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name()) - case c @ CreateTableAsSelectStatement( - NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => - CreateTableAsSelect( - catalog.asTableCatalog, - tbl.asIdentifier, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - c.asSelect, - convertTableProperties(c), - writeOptions = c.writeOptions, - ignoreIfExists = c.ifNotExists) - case c @ ReplaceTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => ReplaceTable( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f7d96f8..235f6a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3410,7 +3410,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelectStatement]] logical plan. + * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelect]] logical plan. * * Expected format: * {{{ @@ -3456,6 +3456,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } val partitioning = partitionExpressions(partTransforms, partCols, ctx) + val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, + serdeInfo, external) Option(ctx.query).map(plan) match { case Some(_) if columns.nonEmpty => @@ -3470,15 +3472,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg ctx) case Some(query) => - CreateTableAsSelectStatement( - table, query, partitioning, bucketSpec, properties, provider, options, location, comment, - writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists) + CreateTableAsSelect( + UnresolvedDBObjectName(table, isNamespace = false), + partitioning, query, tableSpec, Map.empty, ifNotExists) case _ => // Note: table schema includes both the table columns list and the partition columns // with data type. - val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, - serdeInfo, external) val schema = StructType(columns ++ partCols) CreateTable( UnresolvedDBObjectName(table, isNamespace = false), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 70c6f15..20d6894 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -124,29 +124,6 @@ object SerdeInfo { } /** - * A CREATE TABLE AS SELECT command, as parsed from SQL. - */ -case class CreateTableAsSelectStatement( - tableName: Seq[String], - asSelect: LogicalPlan, - partitioning: Seq[Transform], - bucketSpec: Option[BucketSpec], - properties: Map[String, String], - provider: Option[String], - options: Map[String, String], - location: Option[String], - comment: Option[String], - writeOptions: Map[String, String], - serde: Option[SerdeInfo], - external: Boolean, - ifNotExists: Boolean) extends UnaryParsedStatement { - - override def child: LogicalPlan = asSelect - override protected def withNewChildInternal(newChild: LogicalPlan): CreateTableAsSelectStatement = - copy(asSelect = newChild) -} - -/** * A REPLACE TABLE command, as parsed from SQL. * * If the table exists prior to running this command, executing this statement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d9e5dfe..7428da3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -220,16 +220,22 @@ case class CreateTable( * Create a new table from a select query with a v2 catalog. */ case class CreateTableAsSelect( - catalog: TableCatalog, - tableName: Identifier, + name: LogicalPlan, partitioning: Seq[Transform], query: LogicalPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: Map[String, String], - ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan { + ignoreIfExists: Boolean) extends BinaryCommand with V2CreateTablePlan { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper override def tableSchema: StructType = query.schema - override def child: LogicalPlan = query + override def left: LogicalPlan = name + override def right: LogicalPlan = query + + override def tableName: Identifier = { + assert(left.resolved) + left.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier + } override lazy val resolved: Boolean = childrenResolved && { // the table schema is created from the query schema, so the only resolution needed is to check @@ -242,8 +248,11 @@ case class CreateTableAsSelect( this.copy(partitioning = rewritten) } - override protected def withNewChildInternal(newChild: LogicalPlan): CreateTableAsSelect = - copy(query = newChild) + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, + newRight: LogicalPlan + ): CreateTableAsSelect = + copy(name = newLeft, query = newRight) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 44e57f2..4c5ccc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -23,7 +23,7 @@ import java.util.Collections import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableStatement, SerdeInfo, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{ReplaceTableStatement, SerdeInfo, TableSpec} import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -305,11 +305,6 @@ private[sql] object CatalogV2Util { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } - def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, String] = { - convertTableProperties( - c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external) - } - def convertTableProperties(r: ReplaceTableStatement): Map[String, String] = { convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala index aee8e31..41b22bc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala @@ -20,22 +20,22 @@ package org.apache.spark.sql.catalyst.analysis import java.util import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode} -import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, Table, TableCapability, TableCatalog} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, TableSpec} +import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog} import org.apache.spark.sql.connector.expressions.Expressions import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class CreateTablePartitioningValidationSuite extends AnalysisTest { - import CreateTablePartitioningValidationSuite._ test("CreateTableAsSelect: fail missing top-level column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -46,12 +46,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail missing top-level column nested reference") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -62,12 +63,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail missing nested column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -78,12 +80,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail with multiple errors") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "does_not_exist", "point.z") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -95,12 +98,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success with top-level column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "id") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -108,12 +112,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success using nested column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point.x") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -121,12 +126,13 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success using complex column") { + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, "point") :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 182f028..ba0a70a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -719,8 +719,8 @@ class DDLParserSuite extends AnalysisTest { parsedPlan match { case create: CreateTable if newTableToken == "CREATE" => assert(create.ignoreIfExists == expectedIfNotExists) - case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" => - assert(ctas.ifNotExists == expectedIfNotExists) + case ctas: CreateTableAsSelect if newTableToken == "CREATE" => + assert(ctas.ignoreIfExists == expectedIfNotExists) case replace: ReplaceTableStatement if newTableToken == "REPLACE" => case replace: ReplaceTableAsSelect if newTableToken == "REPLACE" => case other => @@ -2310,19 +2310,19 @@ class DDLParserSuite extends AnalysisTest { replace.location, replace.comment, replace.serde) - case ctas: CreateTableAsSelectStatement => + case ctas: CreateTableAsSelect => TableSpec( - ctas.tableName, - Some(ctas.asSelect).filter(_.resolved).map(_.schema), + ctas.name.asInstanceOf[UnresolvedDBObjectName].nameParts, + Some(ctas.query).filter(_.resolved).map(_.schema), ctas.partitioning, - ctas.bucketSpec, - ctas.properties, - ctas.provider, - ctas.options, - ctas.location, - ctas.comment, - ctas.serde, - ctas.external) + ctas.tableSpec.bucketSpec, + ctas.tableSpec.properties, + ctas.tableSpec.provider, + ctas.tableSpec.options, + ctas.tableSpec.location, + ctas.tableSpec.comment, + ctas.tableSpec.serde, + ctas.tableSpec.external) case rtas: ReplaceTableAsSelect => TableSpec( rtas.name.asInstanceOf[UnresolvedDBObjectName].nameParts, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d5d814d..8e2f9cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ @@ -326,15 +326,24 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val catalog = CatalogV2Util.getTableProviderCatalog( supportsExtract, catalogManager, dsOptions) - val location = Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _) - + val tableSpec = TableSpec( + bucketSpec = None, + properties = Map.empty, + provider = Some(source), + options = Map.empty, + location = extraOptions.get("path"), + comment = extraOptions.get(TableCatalog.PROP_COMMENT), + serde = None, + external = false) runCommand(df.sparkSession) { CreateTableAsSelect( - catalog, - ident, + UnresolvedDBObjectName( + catalog.name +: ident.namespace.toSeq :+ ident.name, + isNamespace = false + ), partitioningAsV2, df.queryExecution.analyzed, - Map(TableCatalog.PROP_PROVIDER -> source) ++ location, + tableSpec, finalOptions, ignoreIfExists = createMode == SaveMode.Ignore) } @@ -607,20 +616,23 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // We have a potential race condition here in AppendMode, if the table suddenly gets // created between our existence check and physical execution, but this can't be helped // in any case. - CreateTableAsSelectStatement( - nameParts, - df.queryExecution.analyzed, + val tableSpec = TableSpec( + bucketSpec = None, + properties = Map.empty, + provider = Some(source), + options = Map.empty, + location = extraOptions.get("path"), + comment = extraOptions.get(TableCatalog.PROP_COMMENT), + serde = None, + external = false) + + CreateTableAsSelect( + UnresolvedDBObjectName(nameParts, isNamespace = false), partitioningAsV2, - None, - Map.empty, - Some(source), + df.queryExecution.analyzed, + tableSpec, Map.empty, - extraOptions.get("path"), - extraOptions.get(TableCatalog.PROP_COMMENT), - extraOptions.toMap, - None, - ifNotExists = other == SaveMode.Ignore, - external = false) + other == SaveMode.Ignore) } runCommand(df.sparkSession) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index b99195d..22b2eb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedDBObjectName, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.IntegerType @@ -107,21 +107,23 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } override def create(): Unit = { + val tableSpec = TableSpec( + bucketSpec = None, + properties = properties.toMap, + provider = provider, + options = Map.empty, + location = None, + comment = None, + serde = None, + external = false) runCommand( - CreateTableAsSelectStatement( - tableName, - logicalPlan, + CreateTableAsSelect( + UnresolvedDBObjectName(tableName, isNamespace = false), partitioning.getOrElse(Seq.empty), - None, - properties.toMap, - provider, - Map.empty, - None, - None, + logicalPlan, + tableSpec, options.toMap, - None, - ifNotExists = false, - external = false)) + false)) } override def replace(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 6f41497..d0c9de7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -163,26 +163,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) tableSpec = newTableSpec) } - case c @ CreateTableAsSelectStatement( - SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => + case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _) + if isSessionCatalog(catalog) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.provider, c.options, c.location, c.serde, ctas = true) + c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde, + ctas = true) if (!isV2Provider(provider)) { - val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType, - c.partitioning, c.bucketSpec, c.properties, provider, c.location, - c.comment, storageFormat, c.external) - val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTableV1(tableDesc, mode, Some(c.asSelect)) + val tableDesc = buildCatalogTable(name.asTableIdentifier, new StructType, + c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider, + c.tableSpec.location, c.tableSpec.comment, storageFormat, c.tableSpec.external) + val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTableV1(tableDesc, mode, Some(c.query)) } else { - CreateTableAsSelect( - catalog.asTableCatalog, - tbl.asIdentifier, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - c.asSelect, - convertTableProperties(c), - writeOptions = c.writeOptions, - ignoreIfExists = c.ifNotExists) + val newTableSpec = c.tableSpec.copy(bucketSpec = None) + c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform), + tableSpec = newTableSpec) } case RefreshTable(ResolvedV1TableIdentifier(ident)) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 8a82f36..3355403 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -169,16 +169,16 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, partitioning, tableSpec.copy(location = qualifiedLocation), ifNotExists) :: Nil - case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => - val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) + case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, query, tableSpec, + options, ifNotExists) => val writeOptions = new CaseInsensitiveStringMap(options.asJava) catalog match { case staging: StagingTableCatalog => - AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query), - propsWithOwner, writeOptions, ifNotExists) :: Nil + AtomicCreateTableAsSelectExec(staging, ident.asIdentifier, parts, query, planLater(query), + tableSpec, writeOptions, ifNotExists) :: Nil case _ => - CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query), - propsWithOwner, writeOptions, ifNotExists) :: Nil + CreateTableAsSelectExec(catalog.asTableCatalog, ident.asIdentifier, parts, query, + planLater(query), tableSpec, writeOptions, ifNotExists) :: Nil } case RefreshTable(r: ResolvedTable) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index c61ef56..65c4928 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -71,10 +71,12 @@ case class CreateTableAsSelectExec( partitioning: Seq[Transform], plan: LogicalPlan, query: SparkPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends TableWriteExecHelper { + val properties = CatalogV2Util.convertTableProperties(tableSpec) + override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { if (ifNotExists) { @@ -109,10 +111,12 @@ case class AtomicCreateTableAsSelectExec( partitioning: Seq[Transform], plan: LogicalPlan, query: SparkPlan, - properties: Map[String, String], + tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends TableWriteExecHelper { + val properties = CatalogV2Util.convertTableProperties(tableSpec) + override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { if (ifNotExists) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 8e8eb85..15a25c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -46,12 +46,13 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("ID", "iD").foreach { ref => + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.identity(ref) :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) @@ -69,12 +70,13 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref => + val tableSpec = TableSpec(None, Map.empty, None, Map.empty, + None, None, None, false) val plan = CreateTableAsSelect( - catalog, - Identifier.of(Array(), "table_name"), + UnresolvedDBObjectName(Array("table_name"), isNamespace = false), Expressions.bucket(4, ref) :: Nil, TestRelation2, - Map.empty, + tableSpec, Map.empty, ignoreIfExists = false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index a6b979a3..5862acf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -563,20 +563,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert( + ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name" + ) assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists) @@ -598,20 +590,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql, withDefault = true) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == "testcat") - assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert( + ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name" + ) assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists) @@ -633,18 +617,12 @@ class PlanResolutionSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "provider" -> v2Format, - "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") - parseAndResolve(sql) match { case ctas: CreateTableAsSelect => - assert(ctas.catalog.name == CatalogManager.SESSION_CATALOG_NAME) - assert(ctas.tableName == Identifier.of(Array("mydb"), "page_view")) - assert(ctas.properties == expectedProperties) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == + CatalogManager.SESSION_CATALOG_NAME) + assert(ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == + "mydb.page_view") assert(ctas.writeOptions.isEmpty) assert(ctas.partitioning.isEmpty) assert(ctas.ignoreIfExists) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org