This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 688fa23  [SPARK-36902][SQL] Migrate CreateTableAsSelectStatement to v2 
command
688fa23 is described below

commit 688fa239265b981dc3acc69b79443905afe6a8cf
Author: dch nguyen <dgd_contribu...@viettel.com.vn>
AuthorDate: Fri Dec 3 21:08:41 2021 +0800

    [SPARK-36902][SQL] Migrate CreateTableAsSelectStatement to v2 command
    
    ### What changes were proposed in this pull request?
    Migrate CreateTableAsSelectStatement to v2 command
    
    ### Why are the changes needed?
    Migrate CreateTableAsSelectStatement to v2 command
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    existing tests
    
    Closes #34667 from dchvn/migrate-CTAS.
    
    Authored-by: dch nguyen <dgd_contribu...@viettel.com.vn>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/ResolveCatalogs.scala    | 12 -----
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 12 ++---
 .../sql/catalyst/plans/logical/statements.scala    | 23 ---------
 .../sql/catalyst/plans/logical/v2Commands.scala    | 23 ++++++---
 .../sql/connector/catalog/CatalogV2Util.scala      |  7 +--
 .../CreateTablePartitioningValidationSuite.scala   | 54 ++++++++++++----------
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 26 +++++------
 .../org/apache/spark/sql/DataFrameWriter.scala     | 48 +++++++++++--------
 .../org/apache/spark/sql/DataFrameWriterV2.scala   | 28 +++++------
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 29 +++++-------
 .../datasources/v2/DataSourceV2Strategy.scala      | 12 ++---
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  8 +++-
 .../connector/V2CommandsCaseSensitivitySuite.scala | 14 +++---
 .../execution/command/PlanResolutionSuite.scala    | 46 +++++-------------
 14 files changed, 155 insertions(+), 187 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index d7c6301..3e21a60 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -37,18 +37,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
     case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) 
=>
       ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name())
 
-    case c @ CreateTableAsSelectStatement(
-         NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, 
_, _, _) =>
-      CreateTableAsSelect(
-        catalog.asTableCatalog,
-        tbl.asIdentifier,
-        // convert the bucket spec and add it as a transform
-        c.partitioning ++ c.bucketSpec.map(_.asTransform),
-        c.asSelect,
-        convertTableProperties(c),
-        writeOptions = c.writeOptions,
-        ignoreIfExists = c.ifNotExists)
-
     case c @ ReplaceTableStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, 
_) =>
       ReplaceTable(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index f7d96f8..235f6a6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3410,7 +3410,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
SQLConfHelper with Logg
   }
 
   /**
-   * Create a table, returning a [[CreateTable]] or 
[[CreateTableAsSelectStatement]] logical plan.
+   * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelect]] 
logical plan.
    *
    * Expected format:
    * {{{
@@ -3456,6 +3456,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
SQLConfHelper with Logg
     }
 
     val partitioning = partitionExpressions(partTransforms, partCols, ctx)
+    val tableSpec = TableSpec(bucketSpec, properties, provider, options, 
location, comment,
+      serdeInfo, external)
 
     Option(ctx.query).map(plan) match {
       case Some(_) if columns.nonEmpty =>
@@ -3470,15 +3472,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] 
with SQLConfHelper with Logg
           ctx)
 
       case Some(query) =>
-        CreateTableAsSelectStatement(
-          table, query, partitioning, bucketSpec, properties, provider, 
options, location, comment,
-          writeOptions = Map.empty, serdeInfo, external = external, 
ifNotExists = ifNotExists)
+        CreateTableAsSelect(
+          UnresolvedDBObjectName(table, isNamespace = false),
+          partitioning, query, tableSpec, Map.empty, ifNotExists)
 
       case _ =>
         // Note: table schema includes both the table columns list and the 
partition columns
         // with data type.
-        val tableSpec = TableSpec(bucketSpec, properties, provider, options, 
location, comment,
-          serdeInfo, external)
         val schema = StructType(columns ++ partCols)
         CreateTable(
           UnresolvedDBObjectName(table, isNamespace = false),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 70c6f15..20d6894 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -124,29 +124,6 @@ object SerdeInfo {
 }
 
 /**
- * A CREATE TABLE AS SELECT command, as parsed from SQL.
- */
-case class CreateTableAsSelectStatement(
-    tableName: Seq[String],
-    asSelect: LogicalPlan,
-    partitioning: Seq[Transform],
-    bucketSpec: Option[BucketSpec],
-    properties: Map[String, String],
-    provider: Option[String],
-    options: Map[String, String],
-    location: Option[String],
-    comment: Option[String],
-    writeOptions: Map[String, String],
-    serde: Option[SerdeInfo],
-    external: Boolean,
-    ifNotExists: Boolean) extends UnaryParsedStatement {
-
-  override def child: LogicalPlan = asSelect
-  override protected def withNewChildInternal(newChild: LogicalPlan): 
CreateTableAsSelectStatement =
-    copy(asSelect = newChild)
-}
-
-/**
  * A REPLACE TABLE command, as parsed from SQL.
  *
  * If the table exists prior to running this command, executing this statement
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index d9e5dfe..7428da3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -220,16 +220,22 @@ case class CreateTable(
  * Create a new table from a select query with a v2 catalog.
  */
 case class CreateTableAsSelect(
-    catalog: TableCatalog,
-    tableName: Identifier,
+    name: LogicalPlan,
     partitioning: Seq[Transform],
     query: LogicalPlan,
-    properties: Map[String, String],
+    tableSpec: TableSpec,
     writeOptions: Map[String, String],
-    ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
+    ignoreIfExists: Boolean) extends BinaryCommand with V2CreateTablePlan {
+  import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 
   override def tableSchema: StructType = query.schema
-  override def child: LogicalPlan = query
+  override def left: LogicalPlan = name
+  override def right: LogicalPlan = query
+
+  override def tableName: Identifier = {
+    assert(left.resolved)
+    left.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
+  }
 
   override lazy val resolved: Boolean = childrenResolved && {
     // the table schema is created from the query schema, so the only 
resolution needed is to check
@@ -242,8 +248,11 @@ case class CreateTableAsSelect(
     this.copy(partitioning = rewritten)
   }
 
-  override protected def withNewChildInternal(newChild: LogicalPlan): 
CreateTableAsSelect =
-    copy(query = newChild)
+  override protected def withNewChildrenInternal(
+    newLeft: LogicalPlan,
+    newRight: LogicalPlan
+  ): CreateTableAsSelect =
+    copy(name = newLeft, query = newRight)
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 44e57f2..4c5ccc2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -23,7 +23,7 @@ import java.util.Collections
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, 
NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, 
NoSuchTableException, TimeTravelSpec}
-import 
org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, 
ReplaceTableStatement, SerdeInfo, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{ReplaceTableStatement, 
SerdeInfo, TableSpec}
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
@@ -305,11 +305,6 @@ private[sql] object CatalogV2Util {
     catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
   }
 
-  def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, 
String] = {
-    convertTableProperties(
-      c.properties, c.options, c.serde, c.location, c.comment, c.provider, 
c.external)
-  }
-
   def convertTableProperties(r: ReplaceTableStatement): Map[String, String] = {
     convertTableProperties(r.properties, r.options, r.serde, r.location, 
r.comment, r.provider)
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
index aee8e31..41b22bc 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
@@ -20,22 +20,22 @@ package org.apache.spark.sql.catalyst.analysis
 import java.util
 
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LeafNode}
-import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, Table, TableCapability, TableCatalog}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LeafNode, TableSpec}
+import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, 
TableCapability, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Expressions
 import org.apache.spark.sql.types.{DoubleType, LongType, StringType, 
StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class CreateTablePartitioningValidationSuite extends AnalysisTest {
-  import CreateTablePartitioningValidationSuite._
 
   test("CreateTableAsSelect: fail missing top-level column") {
+    val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
+      None, None, None, false)
     val plan = CreateTableAsSelect(
-      catalog,
-      Identifier.of(Array(), "table_name"),
+      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
       Expressions.bucket(4, "does_not_exist") :: Nil,
       TestRelation2,
-      Map.empty,
+      tableSpec,
       Map.empty,
       ignoreIfExists = false)
 
@@ -46,12 +46,13 @@ class CreateTablePartitioningValidationSuite extends 
AnalysisTest {
   }
 
   test("CreateTableAsSelect: fail missing top-level column nested reference") {
+    val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
+      None, None, None, false)
     val plan = CreateTableAsSelect(
-      catalog,
-      Identifier.of(Array(), "table_name"),
+      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
       Expressions.bucket(4, "does_not_exist.z") :: Nil,
       TestRelation2,
-      Map.empty,
+      tableSpec,
       Map.empty,
       ignoreIfExists = false)
 
@@ -62,12 +63,13 @@ class CreateTablePartitioningValidationSuite extends 
AnalysisTest {
   }
 
   test("CreateTableAsSelect: fail missing nested column") {
+    val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
+      None, None, None, false)
     val plan = CreateTableAsSelect(
-      catalog,
-      Identifier.of(Array(), "table_name"),
+      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
       Expressions.bucket(4, "point.z") :: Nil,
       TestRelation2,
-      Map.empty,
+      tableSpec,
       Map.empty,
       ignoreIfExists = false)
 
@@ -78,12 +80,13 @@ class CreateTablePartitioningValidationSuite extends 
AnalysisTest {
   }
 
   test("CreateTableAsSelect: fail with multiple errors") {
+    val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
+      None, None, None, false)
     val plan = CreateTableAsSelect(
-      catalog,
-      Identifier.of(Array(), "table_name"),
+      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
       Expressions.bucket(4, "does_not_exist", "point.z") :: Nil,
       TestRelation2,
-      Map.empty,
+      tableSpec,
       Map.empty,
       ignoreIfExists = false)
 
@@ -95,12 +98,13 @@ class CreateTablePartitioningValidationSuite extends 
AnalysisTest {
   }
 
   test("CreateTableAsSelect: success with top-level column") {
+    val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
+      None, None, None, false)
     val plan = CreateTableAsSelect(
-      catalog,
-      Identifier.of(Array(), "table_name"),
+      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
       Expressions.bucket(4, "id") :: Nil,
       TestRelation2,
-      Map.empty,
+      tableSpec,
       Map.empty,
       ignoreIfExists = false)
 
@@ -108,12 +112,13 @@ class CreateTablePartitioningValidationSuite extends 
AnalysisTest {
   }
 
   test("CreateTableAsSelect: success using nested column") {
+    val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
+      None, None, None, false)
     val plan = CreateTableAsSelect(
-      catalog,
-      Identifier.of(Array(), "table_name"),
+      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
       Expressions.bucket(4, "point.x") :: Nil,
       TestRelation2,
-      Map.empty,
+      tableSpec,
       Map.empty,
       ignoreIfExists = false)
 
@@ -121,12 +126,13 @@ class CreateTablePartitioningValidationSuite extends 
AnalysisTest {
   }
 
   test("CreateTableAsSelect: success using complex column") {
+    val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
+      None, None, None, false)
     val plan = CreateTableAsSelect(
-      catalog,
-      Identifier.of(Array(), "table_name"),
+      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
       Expressions.bucket(4, "point") :: Nil,
       TestRelation2,
-      Map.empty,
+      tableSpec,
       Map.empty,
       ignoreIfExists = false)
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 182f028..ba0a70a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -719,8 +719,8 @@ class DDLParserSuite extends AnalysisTest {
     parsedPlan match {
       case create: CreateTable if newTableToken == "CREATE" =>
         assert(create.ignoreIfExists == expectedIfNotExists)
-      case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" =>
-        assert(ctas.ifNotExists == expectedIfNotExists)
+      case ctas: CreateTableAsSelect if newTableToken == "CREATE" =>
+        assert(ctas.ignoreIfExists == expectedIfNotExists)
       case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
       case replace: ReplaceTableAsSelect if newTableToken == "REPLACE" =>
       case other =>
@@ -2310,19 +2310,19 @@ class DDLParserSuite extends AnalysisTest {
             replace.location,
             replace.comment,
             replace.serde)
-        case ctas: CreateTableAsSelectStatement =>
+        case ctas: CreateTableAsSelect =>
           TableSpec(
-            ctas.tableName,
-            Some(ctas.asSelect).filter(_.resolved).map(_.schema),
+            ctas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
+            Some(ctas.query).filter(_.resolved).map(_.schema),
             ctas.partitioning,
-            ctas.bucketSpec,
-            ctas.properties,
-            ctas.provider,
-            ctas.options,
-            ctas.location,
-            ctas.comment,
-            ctas.serde,
-            ctas.external)
+            ctas.tableSpec.bucketSpec,
+            ctas.tableSpec.properties,
+            ctas.tableSpec.provider,
+            ctas.tableSpec.options,
+            ctas.tableSpec.location,
+            ctas.tableSpec.comment,
+            ctas.tableSpec.serde,
+            ctas.tableSpec.external)
         case rtas: ReplaceTableAsSelect =>
           TableSpec(
             rtas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index d5d814d..8e2f9cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, 
LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, 
ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, 
OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{CatalogPlugin, 
CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, 
TableCatalog, TableProvider, V1Table}
 import org.apache.spark.sql.connector.catalog.TableCapability._
@@ -326,15 +326,24 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
               val catalog = CatalogV2Util.getTableProviderCatalog(
                 supportsExtract, catalogManager, dsOptions)
 
-              val location = 
Option(dsOptions.get("path")).map(TableCatalog.PROP_LOCATION -> _)
-
+              val tableSpec = TableSpec(
+                bucketSpec = None,
+                properties = Map.empty,
+                provider = Some(source),
+                options = Map.empty,
+                location = extraOptions.get("path"),
+                comment = extraOptions.get(TableCatalog.PROP_COMMENT),
+                serde = None,
+                external = false)
               runCommand(df.sparkSession) {
                 CreateTableAsSelect(
-                  catalog,
-                  ident,
+                  UnresolvedDBObjectName(
+                    catalog.name +: ident.namespace.toSeq :+ ident.name,
+                    isNamespace = false
+                  ),
                   partitioningAsV2,
                   df.queryExecution.analyzed,
-                  Map(TableCatalog.PROP_PROVIDER -> source) ++ location,
+                  tableSpec,
                   finalOptions,
                   ignoreIfExists = createMode == SaveMode.Ignore)
               }
@@ -607,20 +616,23 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
         // We have a potential race condition here in AppendMode, if the table 
suddenly gets
         // created between our existence check and physical execution, but 
this can't be helped
         // in any case.
-        CreateTableAsSelectStatement(
-          nameParts,
-          df.queryExecution.analyzed,
+        val tableSpec = TableSpec(
+          bucketSpec = None,
+          properties = Map.empty,
+          provider = Some(source),
+          options = Map.empty,
+          location = extraOptions.get("path"),
+          comment = extraOptions.get(TableCatalog.PROP_COMMENT),
+          serde = None,
+          external = false)
+
+        CreateTableAsSelect(
+          UnresolvedDBObjectName(nameParts, isNamespace = false),
           partitioningAsV2,
-          None,
-          Map.empty,
-          Some(source),
+          df.queryExecution.analyzed,
+          tableSpec,
           Map.empty,
-          extraOptions.get("path"),
-          extraOptions.get(TableCatalog.PROP_COMMENT),
-          extraOptions.toMap,
-          None,
-          ifNotExists = other == SaveMode.Ignore,
-          external = false)
+          other == SaveMode.Ignore)
     }
 
     runCommand(df.sparkSession) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index b99195d..22b2eb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 import org.apache.spark.annotation.Experimental
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
NoSuchTableException, TableAlreadyExistsException, UnresolvedDBObjectName, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, 
Hours, Literal, Months, Years}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, 
OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, LogicalPlan, OverwriteByExpression, 
OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
 import org.apache.spark.sql.connector.expressions.{LogicalExpressions, 
NamedReference, Transform}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.IntegerType
@@ -107,21 +107,23 @@ final class DataFrameWriterV2[T] private[sql](table: 
String, ds: Dataset[T])
   }
 
   override def create(): Unit = {
+    val tableSpec = TableSpec(
+      bucketSpec = None,
+      properties = properties.toMap,
+      provider = provider,
+      options = Map.empty,
+      location = None,
+      comment = None,
+      serde = None,
+      external = false)
     runCommand(
-      CreateTableAsSelectStatement(
-        tableName,
-        logicalPlan,
+      CreateTableAsSelect(
+        UnresolvedDBObjectName(tableName, isNamespace = false),
         partitioning.getOrElse(Seq.empty),
-        None,
-        properties.toMap,
-        provider,
-        Map.empty,
-        None,
-        None,
+        logicalPlan,
+        tableSpec,
         options.toMap,
-        None,
-        ifNotExists = false,
-        external = false))
+        false))
   }
 
   override def replace(): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 6f41497..d0c9de7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -163,26 +163,21 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
           tableSpec = newTableSpec)
       }
 
-    case c @ CreateTableAsSelectStatement(
-         SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, 
_, _) =>
+    case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, 
_, _)
+      if isSessionCatalog(catalog) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
-        c.provider, c.options, c.location, c.serde, ctas = true)
+        c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, 
c.tableSpec.serde,
+        ctas = true)
       if (!isV2Provider(provider)) {
-        val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new 
StructType,
-          c.partitioning, c.bucketSpec, c.properties, provider, c.location,
-          c.comment, storageFormat, c.external)
-        val mode = if (c.ifNotExists) SaveMode.Ignore else 
SaveMode.ErrorIfExists
-        CreateTableV1(tableDesc, mode, Some(c.asSelect))
+        val tableDesc = buildCatalogTable(name.asTableIdentifier, new 
StructType,
+          c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, 
provider,
+          c.tableSpec.location, c.tableSpec.comment, storageFormat, 
c.tableSpec.external)
+        val mode = if (c.ignoreIfExists) SaveMode.Ignore else 
SaveMode.ErrorIfExists
+        CreateTableV1(tableDesc, mode, Some(c.query))
       } else {
-        CreateTableAsSelect(
-          catalog.asTableCatalog,
-          tbl.asIdentifier,
-          // convert the bucket spec and add it as a transform
-          c.partitioning ++ c.bucketSpec.map(_.asTransform),
-          c.asSelect,
-          convertTableProperties(c),
-          writeOptions = c.writeOptions,
-          ignoreIfExists = c.ifNotExists)
+        val newTableSpec = c.tableSpec.copy(bucketSpec = None)
+        c.copy(partitioning = c.partitioning ++ 
c.tableSpec.bucketSpec.map(_.asTransform),
+          tableSpec = newTableSpec)
       }
 
     case RefreshTable(ResolvedV1TableIdentifier(ident)) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 8a82f36..3355403 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -169,16 +169,16 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema,
         partitioning, tableSpec.copy(location = qualifiedLocation), 
ifNotExists) :: Nil
 
-    case CreateTableAsSelect(catalog, ident, parts, query, props, options, 
ifNotExists) =>
-      val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
+    case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, 
query, tableSpec,
+        options, ifNotExists) =>
       val writeOptions = new CaseInsensitiveStringMap(options.asJava)
       catalog match {
         case staging: StagingTableCatalog =>
-          AtomicCreateTableAsSelectExec(staging, ident, parts, query, 
planLater(query),
-            propsWithOwner, writeOptions, ifNotExists) :: Nil
+          AtomicCreateTableAsSelectExec(staging, ident.asIdentifier, parts, 
query, planLater(query),
+            tableSpec, writeOptions, ifNotExists) :: Nil
         case _ =>
-          CreateTableAsSelectExec(catalog, ident, parts, query, 
planLater(query),
-            propsWithOwner, writeOptions, ifNotExists) :: Nil
+          CreateTableAsSelectExec(catalog.asTableCatalog, ident.asIdentifier, 
parts, query,
+            planLater(query), tableSpec, writeOptions, ifNotExists) :: Nil
       }
 
     case RefreshTable(r: ResolvedTable) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index c61ef56..65c4928 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -71,10 +71,12 @@ case class CreateTableAsSelectExec(
     partitioning: Seq[Transform],
     plan: LogicalPlan,
     query: SparkPlan,
-    properties: Map[String, String],
+    tableSpec: TableSpec,
     writeOptions: CaseInsensitiveStringMap,
     ifNotExists: Boolean) extends TableWriteExecHelper {
 
+  val properties = CatalogV2Util.convertTableProperties(tableSpec)
+
   override protected def run(): Seq[InternalRow] = {
     if (catalog.tableExists(ident)) {
       if (ifNotExists) {
@@ -109,10 +111,12 @@ case class AtomicCreateTableAsSelectExec(
     partitioning: Seq[Transform],
     plan: LogicalPlan,
     query: SparkPlan,
-    properties: Map[String, String],
+    tableSpec: TableSpec,
     writeOptions: CaseInsensitiveStringMap,
     ifNotExists: Boolean) extends TableWriteExecHelper {
 
+  val properties = CatalogV2Util.convertTableProperties(tableSpec)
+
   override protected def run(): Seq[InternalRow] = {
     if (catalog.tableExists(ident)) {
       if (ifNotExists) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index 8e8eb85..15a25c2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -46,12 +46,13 @@ class V2CommandsCaseSensitivitySuite extends 
SharedSparkSession with AnalysisTes
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         Seq("ID", "iD").foreach { ref =>
+          val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
+            None, None, None, false)
           val plan = CreateTableAsSelect(
-            catalog,
-            Identifier.of(Array(), "table_name"),
+            UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
             Expressions.identity(ref) :: Nil,
             TestRelation2,
-            Map.empty,
+            tableSpec,
             Map.empty,
             ignoreIfExists = false)
 
@@ -69,12 +70,13 @@ class V2CommandsCaseSensitivitySuite extends 
SharedSparkSession with AnalysisTes
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
         Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
+          val tableSpec = TableSpec(None, Map.empty, None, Map.empty,
+            None, None, None, false)
           val plan = CreateTableAsSelect(
-            catalog,
-            Identifier.of(Array(), "table_name"),
+            UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
             Expressions.bucket(4, ref) :: Nil,
             TestRelation2,
-            Map.empty,
+            tableSpec,
             Map.empty,
             ignoreIfExists = false)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index a6b979a3..5862acf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -563,20 +563,12 @@ class PlanResolutionSuite extends AnalysisTest {
          |AS SELECT * FROM src
       """.stripMargin
 
-    val expectedProperties = Map(
-      "p1" -> "v1",
-      "p2" -> "v2",
-      "option.other" -> "20",
-      "provider" -> "parquet",
-      "location" -> "s3://bucket/path/to/data",
-      "comment" -> "table comment",
-      "other" -> "20")
-
     parseAndResolve(sql) match {
       case ctas: CreateTableAsSelect =>
-        assert(ctas.catalog.name == "testcat")
-        assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name"))
-        assert(ctas.properties == expectedProperties)
+        assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == 
"testcat")
+        assert(
+          ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") 
== "mydb.table_name"
+        )
         assert(ctas.writeOptions.isEmpty)
         assert(ctas.partitioning.isEmpty)
         assert(ctas.ignoreIfExists)
@@ -598,20 +590,12 @@ class PlanResolutionSuite extends AnalysisTest {
          |AS SELECT * FROM src
       """.stripMargin
 
-    val expectedProperties = Map(
-      "p1" -> "v1",
-      "p2" -> "v2",
-      "option.other" -> "20",
-      "provider" -> "parquet",
-      "location" -> "s3://bucket/path/to/data",
-      "comment" -> "table comment",
-      "other" -> "20")
-
     parseAndResolve(sql, withDefault = true) match {
       case ctas: CreateTableAsSelect =>
-        assert(ctas.catalog.name == "testcat")
-        assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name"))
-        assert(ctas.properties == expectedProperties)
+        assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == 
"testcat")
+        assert(
+          ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") 
== "mydb.table_name"
+        )
         assert(ctas.writeOptions.isEmpty)
         assert(ctas.partitioning.isEmpty)
         assert(ctas.ignoreIfExists)
@@ -633,18 +617,12 @@ class PlanResolutionSuite extends AnalysisTest {
         |AS SELECT * FROM src
       """.stripMargin
 
-    val expectedProperties = Map(
-      "p1" -> "v1",
-      "p2" -> "v2",
-      "provider" -> v2Format,
-      "location" -> "/user/external/page_view",
-      "comment" -> "This is the staging page view table")
-
     parseAndResolve(sql) match {
       case ctas: CreateTableAsSelect =>
-        assert(ctas.catalog.name == CatalogManager.SESSION_CATALOG_NAME)
-        assert(ctas.tableName == Identifier.of(Array("mydb"), "page_view"))
-        assert(ctas.properties == expectedProperties)
+        assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name ==
+          CatalogManager.SESSION_CATALOG_NAME)
+        
assert(ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+          "mydb.page_view")
         assert(ctas.writeOptions.isEmpty)
         assert(ctas.partitioning.isEmpty)
         assert(ctas.ignoreIfExists)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to