This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 98b0c80  [SPARK-36850][SQL] Migrate CreateTableStatement to v2 command 
framework
98b0c80 is described below

commit 98b0c80adba6c5c0b2ecdf84ff9de2abef6b1a39
Author: Huaxin Gao <huaxin_...@apple.com>
AuthorDate: Tue Nov 30 12:47:57 2021 +0800

    [SPARK-36850][SQL] Migrate CreateTableStatement to v2 command framework
    
    ### What changes were proposed in this pull request?
    Migrate `CreateTableStatement` to v2 command framework
    
    ### Why are the changes needed?
    Migrate to the standard V2 framework
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    existing tests
    
    Closes #34060 from huaxingao/create_table.
    
    Lead-authored-by: Huaxin Gao <huaxin_...@apple.com>
    Co-authored-by: Huaxin Gao <huaxin.ga...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/ResolveCatalogs.scala    |  11 ---
 .../spark/sql/catalyst/parser/AstBuilder.scala     |   9 +-
 .../sql/catalyst/plans/logical/statements.scala    |  19 ----
 .../sql/catalyst/plans/logical/v2Commands.scala    |  34 +++++--
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |   4 +
 .../sql/connector/catalog/CatalogV2Util.scala      |   9 +-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |  24 ++---
 .../catalyst/analysis/ReplaceCharWithVarchar.scala |   4 +-
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  32 +++---
 .../spark/sql/execution/datasources/rules.scala    |   9 +-
 .../execution/datasources/v2/CreateTableExec.scala |  13 ++-
 .../datasources/v2/DataSourceV2Strategy.scala      |   7 +-
 .../spark/sql/streaming/DataStreamWriter.scala     |  19 ++--
 .../execution/command/PlanResolutionSuite.scala    | 110 ++++++++-------------
 14 files changed, 141 insertions(+), 163 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index efc1ab2..83d7b93 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -37,17 +37,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
     case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) 
=>
       ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name())
 
-    case c @ CreateTableStatement(
-         NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, 
_, _) =>
-      CreateV2Table(
-        catalog.asTableCatalog,
-        tbl.asIdentifier,
-        c.tableSchema,
-        // convert the bucket spec and add it as a transform
-        c.partitioning ++ c.bucketSpec.map(_.asTransform),
-        convertTableProperties(c),
-        ignoreIfExists = c.ifNotExists)
-
     case c @ CreateTableAsSelectStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, 
_, _, _) =>
       CreateTableAsSelect(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 90c1e39..2c32ba7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3410,7 +3410,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
SQLConfHelper with Logg
   }
 
   /**
-   * Create a table, returning a [[CreateTableStatement]] logical plan.
+   * Create a table, returning a [[CreateTable]] or 
[[CreateTableAsSelectStatement]] logical plan.
    *
    * Expected format:
    * {{{
@@ -3477,9 +3477,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
SQLConfHelper with Logg
       case _ =>
         // Note: table schema includes both the table columns list and the 
partition columns
         // with data type.
+        val tableSpec = TableSpec(bucketSpec, properties, provider, options, 
location, comment,
+          serdeInfo, external)
         val schema = StructType(columns ++ partCols)
-        CreateTableStatement(table, schema, partitioning, bucketSpec, 
properties, provider,
-          options, location, comment, serdeInfo, external = external, 
ifNotExists = ifNotExists)
+        CreateTable(
+          UnresolvedDBObjectName(table, isNamespace = false),
+          schema, partitioning, tableSpec, ignoreIfExists = ifNotExists)
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index ccc4e19..a6ed304 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -124,25 +124,6 @@ object SerdeInfo {
 }
 
 /**
- * A CREATE TABLE command, as parsed from SQL.
- *
- * This is a metadata-only command and is not used to write data to the 
created table.
- */
-case class CreateTableStatement(
-    tableName: Seq[String],
-    tableSchema: StructType,
-    partitioning: Seq[Transform],
-    bucketSpec: Option[BucketSpec],
-    properties: Map[String, String],
-    provider: Option[String],
-    options: Map[String, String],
-    location: Option[String],
-    comment: Option[String],
-    serde: Option[SerdeInfo],
-    external: Boolean,
-    ifNotExists: Boolean) extends LeafParsedStatement
-
-/**
  * A CREATE TABLE AS SELECT command, as parsed from SQL.
  */
 case class CreateTableAsSelectStatement(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 4ed5d87..d39e288 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, 
NamedRelation, PartitionSpec, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, 
NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.FunctionResource
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, Unevaluable}
@@ -193,13 +194,24 @@ trait V2CreateTablePlan extends LogicalPlan {
 /**
  * Create a new table with a v2 catalog.
  */
-case class CreateV2Table(
-    catalog: TableCatalog,
-    tableName: Identifier,
+case class CreateTable(
+    name: LogicalPlan,
     tableSchema: StructType,
     partitioning: Seq[Transform],
-    properties: Map[String, String],
-    ignoreIfExists: Boolean) extends LeafCommand with V2CreateTablePlan {
+    tableSpec: TableSpec,
+    ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
+  import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+
+  override def child: LogicalPlan = name
+
+  override def tableName: Identifier = {
+    assert(child.resolved)
+    child.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
V2CreateTablePlan =
+    copy(name = newChild)
+
   override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan 
= {
     this.copy(partitioning = rewritten)
   }
@@ -1090,3 +1102,13 @@ case class DropIndex(
   override protected def withNewChildInternal(newChild: LogicalPlan): 
DropIndex =
     copy(table = newChild)
 }
+
+case class TableSpec(
+    bucketSpec: Option[BucketSpec],
+    properties: Map[String, String],
+    provider: Option[String],
+    options: Map[String, String],
+    location: Option[String],
+    comment: Option[String],
+    serde: Option[SerdeInfo],
+    external: Boolean)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index f3f6744..3d62cf2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.TableSpec
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
Partitioning}
 import org.apache.spark.sql.catalyst.rules.RuleId
 import org.apache.spark.sql.catalyst.rules.RuleIdCollection
@@ -819,6 +820,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product with Tre
       redactMapString(map.asCaseSensitiveMap().asScala, maxFields)
     case map: Map[_, _] =>
       redactMapString(map, maxFields)
+    case t: TableSpec =>
+      t.copy(properties = Utils.redact(t.properties).toMap,
+        options = Utils.redact(t.options).toMap) :: Nil
     case table: CatalogTable =>
       table.storage.serde match {
         case Some(serde) => table.identifier :: serde :: Nil
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 7c11463..fabc73f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -23,7 +23,7 @@ import java.util.Collections
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, 
NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, 
NoSuchTableException, TimeTravelSpec}
-import 
org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, 
CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, 
SerdeInfo}
+import 
org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, 
ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
@@ -305,11 +305,6 @@ private[sql] object CatalogV2Util {
     catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
   }
 
-  def convertTableProperties(c: CreateTableStatement): Map[String, String] = {
-    convertTableProperties(
-      c.properties, c.options, c.serde, c.location, c.comment, c.provider, 
c.external)
-  }
-
   def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, 
String] = {
     convertTableProperties(
       c.properties, c.options, c.serde, c.location, c.comment, c.provider, 
c.external)
@@ -323,7 +318,7 @@ private[sql] object CatalogV2Util {
     convertTableProperties(r.properties, r.options, r.serde, r.location, 
r.comment, r.provider)
   }
 
-  private def convertTableProperties(
+  def convertTableProperties(
       properties: Map[String, String],
       options: Map[String, String],
       serdeInfo: Option[SerdeInfo],
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 70227bb..f4ab807 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -717,8 +717,8 @@ class DDLParserSuite extends AnalysisTest {
     val parsedPlan = parsePlan(sqlStatement)
     val newTableToken = sqlStatement.split(" 
")(0).trim.toUpperCase(Locale.ROOT)
     parsedPlan match {
-      case create: CreateTableStatement if newTableToken == "CREATE" =>
-        assert(create.ifNotExists == expectedIfNotExists)
+      case create: CreateTable if newTableToken == "CREATE" =>
+        assert(create.ignoreIfExists == expectedIfNotExists)
       case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" =>
         assert(ctas.ifNotExists == expectedIfNotExists)
       case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
@@ -2285,19 +2285,19 @@ class DDLParserSuite extends AnalysisTest {
   private object TableSpec {
     def apply(plan: LogicalPlan): TableSpec = {
       plan match {
-        case create: CreateTableStatement =>
+        case create: CreateTable =>
           TableSpec(
-            create.tableName,
+            create.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
             Some(create.tableSchema),
             create.partitioning,
-            create.bucketSpec,
-            create.properties,
-            create.provider,
-            create.options,
-            create.location,
-            create.comment,
-            create.serde,
-            create.external)
+            create.tableSpec.bucketSpec,
+            create.tableSpec.properties,
+            create.tableSpec.provider,
+            create.tableSpec.options,
+            create.tableSpec.location,
+            create.tableSpec.comment,
+            create.tableSpec.serde,
+            create.tableSpec.external)
         case replace: ReplaceTableStatement =>
           TableSpec(
             replace.tableName,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala
index 7404a30..3f9eb5c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, 
CreateV2Table, LogicalPlan, ReplaceColumns, ReplaceTable}
+import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, 
CreateTable, LogicalPlan, ReplaceColumns, ReplaceTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, 
AlterTableChangeColumnCommand, CreateDataSourceTableCommand, CreateTableCommand}
@@ -31,7 +31,7 @@ object ReplaceCharWithVarchar extends Rule[LogicalPlan] {
 
     plan.resolveOperators {
       // V2 commands
-      case cmd: CreateV2Table =>
+      case cmd: CreateTable =>
         cmd.copy(tableSchema = replaceCharWithVarcharInSchema(cmd.tableSchema))
       case cmd: ReplaceTable =>
         cmd.copy(tableSchema = replaceCharWithVarcharInSchema(cmd.tableSchema))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 5362b6b..0940982 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable => 
CatalystCreateTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL}
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table}
@@ -143,25 +144,24 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
 
     // For CREATE TABLE [AS SELECT], we should use the v1 command if the 
catalog is resolved to the
     // session catalog and the table provider is not v2.
-    case c @ CreateTableStatement(
-         SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, 
_) =>
+    case c @ CatalystCreateTable(ResolvedDBObjectName(catalog, name), _, _, _, 
_) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
-        c.provider, c.options, c.location, c.serde, ctas = false)
-      if (!isV2Provider(provider)) {
-        val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema,
-          c.partitioning, c.bucketSpec, c.properties, provider, c.location,
-          c.comment, storageFormat, c.external)
-        val mode = if (c.ifNotExists) SaveMode.Ignore else 
SaveMode.ErrorIfExists
+        c.tableSpec.provider,
+        c.tableSpec.options,
+        c.tableSpec.location,
+        c.tableSpec.serde,
+        ctas = false)
+      if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
+        val tableDesc = buildCatalogTable(name.asTableIdentifier, 
c.tableSchema,
+          c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, 
provider,
+          c.tableSpec.location, c.tableSpec.comment, storageFormat,
+          c.tableSpec.external)
+        val mode = if (c.ignoreIfExists) SaveMode.Ignore else 
SaveMode.ErrorIfExists
         CreateTable(tableDesc, mode, None)
       } else {
-        CreateV2Table(
-          catalog.asTableCatalog,
-          tbl.asIdentifier,
-          c.tableSchema,
-          // convert the bucket spec and add it as a transform
-          c.partitioning ++ c.bucketSpec.map(_.asTransform),
-          convertTableProperties(c),
-          ignoreIfExists = c.ifNotExists)
+        val newTableSpec = c.tableSpec.copy(bucketSpec = None)
+        c.copy(partitioning = c.partitioning ++ 
c.tableSpec.bucketSpec.map(_.asTransform),
+          tableSpec = newTableSpec)
       }
 
     case c @ CreateTableAsSelectStatement(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 0e8efb6..327d926 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
RewritableTransform}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.{CreateTable => 
CreateTableV1}
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
 import org.apache.spark.sql.sources.InsertableRelation
 import org.apache.spark.sql.types.{AtomicType, StructType}
@@ -81,7 +82,7 @@ case class PreprocessTableCreation(sparkSession: 
SparkSession) extends Rule[Logi
     // bucketing information is specified, as we can't infer bucketing from 
data files currently.
     // Since the runtime inferred partition columns could be different from 
what user specified,
     // we fail the query if the partitioning information is specified.
-    case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+    case c @ CreateTableV1(tableDesc, _, None) if tableDesc.schema.isEmpty =>
       if (tableDesc.bucketSpec.isDefined) {
         failAnalysis("Cannot specify bucketing information if the table schema 
is not specified " +
           "when creating and will be inferred at runtime")
@@ -96,7 +97,7 @@ case class PreprocessTableCreation(sparkSession: 
SparkSession) extends Rule[Logi
     // When we append data to an existing table, check if the given provider, 
partition columns,
     // bucket spec, etc. match the existing table, and adjust the columns 
order of the given query
     // if necessary.
-    case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
+    case c @ CreateTableV1(tableDesc, SaveMode.Append, Some(query))
         if query.resolved && catalog.tableExists(tableDesc.identifier) =>
       // This is guaranteed by the parser and `DataFrameWriter`
       assert(tableDesc.provider.isDefined)
@@ -189,7 +190,7 @@ case class PreprocessTableCreation(sparkSession: 
SparkSession) extends Rule[Logi
     //   * partition columns' type must be AtomicType.
     //   * sort columns' type must be orderable.
     //   * reorder table schema or output of query plan, to put partition 
columns at the end.
-    case c @ CreateTable(tableDesc, _, query) if query.forall(_.resolved) =>
+    case c @ CreateTableV1(tableDesc, _, query) if query.forall(_.resolved) =>
       if (query.isDefined) {
         assert(tableDesc.schema.isEmpty,
           "Schema may not be specified in a Create Table As Select (CTAS) 
statement")
@@ -433,7 +434,7 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
 object HiveOnlyCheck extends (LogicalPlan => Unit) {
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
-      case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) =>
+      case CreateTableV1(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) =>
         throw QueryCompilationErrors.ddlWithoutHiveSupportEnabledError(
           "CREATE Hive TABLE (AS SELECT)")
       case i: InsertIntoDir if DDLUtils.isHiveTable(i.provider) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
index be7331b..6e5c3af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
@@ -22,7 +22,8 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.catalyst.plans.logical.TableSpec
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.StructType
@@ -32,10 +33,18 @@ case class CreateTableExec(
     identifier: Identifier,
     tableSchema: StructType,
     partitioning: Seq[Transform],
-    tableProperties: Map[String, String],
+    tableSpec: TableSpec,
     ignoreIfExists: Boolean) extends LeafV2CommandExec {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
+  val tableProperties = {
+    val props = CatalogV2Util.convertTableProperties(
+      tableSpec.properties, tableSpec.options, tableSpec.serde,
+      tableSpec.location, tableSpec.comment, tableSpec.provider,
+      tableSpec.external)
+    CatalogV2Util.withDefaultOwnership(props)
+  }
+
   override protected def run(): Seq[InternalRow] = {
     if (!catalog.tableExists(identifier)) {
       try {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 026ff63..f64c1ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -165,9 +165,10 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       }
       WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), 
customMetrics) :: Nil
 
-    case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) =>
-      val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
-      CreateTableExec(catalog, ident, schema, parts, propsWithOwner, 
ifNotExists) :: Nil
+    case CreateTable(ResolvedDBObjectName(catalog, ident), schema, 
partitioning,
+        tableSpec, ifNotExists) =>
+      CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema,
+        partitioning, tableSpec, ifNotExists) :: Nil
 
     case CreateTableAsSelect(catalog, ident, parts, query, props, options, 
ifNotExists) =>
       val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 10ce9d3..2d3c898 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -27,8 +27,9 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.api.java.function.VoidFunction2
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.CreateTableStatement
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, TableSpec}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, 
Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback}
@@ -288,10 +289,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
        * Note, currently the new table creation by this API doesn't fully 
cover the V2 table.
        * TODO (SPARK-33638): Full support of v2 table creation
        */
-      val cmd = CreateTableStatement(
-        originalMultipartIdentifier,
-        df.schema.asNullable,
-        partitioningColumns.getOrElse(Nil).asTransforms.toSeq,
+      val tableProperties = TableSpec(
         None,
         Map.empty[String, String],
         Some(source),
@@ -299,8 +297,15 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
         extraOptions.get("path"),
         None,
         None,
-        external = false,
-        ifNotExists = false)
+        false)
+      val cmd = CreateTable(
+        UnresolvedDBObjectName(
+          originalMultipartIdentifier,
+          isNamespace = false),
+        df.schema.asNullable,
+        partitioningColumns.getOrElse(Nil).asTransforms.toSeq,
+        tableProperties,
+        ignoreIfExists = false)
       Dataset.ofRows(df.sparkSession, cmd)
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 85ba14f..a6b979a3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -26,17 +26,17 @@ import org.mockito.invocation.InvocationOnMock
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, 
Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, 
ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, 
UnresolvedSubqueryColumnAliases, UnresolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, 
Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedDBObjectName, 
ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, 
UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{AnsiCast, 
AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, 
Literal, StringLiteral}
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
-import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, 
AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, 
CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, 
DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, 
MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, 
ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, 
UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, 
AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, 
DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, 
LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, 
SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, 
UnsetTableProperties, UpdateAction, UpdateTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.FakeV2Provider
 import org.apache.spark.sql.connector.catalog.{CatalogManager, 
CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, 
V1Table}
 import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.datasources.CreateTable
+import org.apache.spark.sql.execution.datasources.{CreateTable => 
CreateTableV1}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.sources.SimpleScanSource
@@ -210,7 +210,7 @@ class PlanResolutionSuite extends AnalysisTest {
 
   private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
     parseAndResolve(sql).collect {
-      case CreateTable(tableDesc, mode, _) => (tableDesc, mode == 
SaveMode.Ignore)
+      case CreateTableV1(tableDesc, mode, _) => (tableDesc, mode == 
SaveMode.Ignore)
     }.head
   }
 
@@ -240,7 +240,7 @@ class PlanResolutionSuite extends AnalysisTest {
     )
 
     parseAndResolve(query) match {
-      case CreateTable(tableDesc, _, None) =>
+      case CreateTableV1(tableDesc, _, None) =>
         assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
       case other =>
         fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
@@ -282,7 +282,7 @@ class PlanResolutionSuite extends AnalysisTest {
     )
 
     parseAndResolve(query) match {
-      case CreateTable(tableDesc, _, None) =>
+      case CreateTableV1(tableDesc, _, None) =>
         assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
       case other =>
         fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
@@ -302,7 +302,7 @@ class PlanResolutionSuite extends AnalysisTest {
       comment = Some("abc"))
 
     parseAndResolve(sql) match {
-      case CreateTable(tableDesc, _, None) =>
+      case CreateTableV1(tableDesc, _, None) =>
         assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
       case other =>
         fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
@@ -322,7 +322,7 @@ class PlanResolutionSuite extends AnalysisTest {
       properties = Map("test" -> "test"))
 
     parseAndResolve(sql) match {
-      case CreateTable(tableDesc, _, None) =>
+      case CreateTableV1(tableDesc, _, None) =>
         assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
       case other =>
         fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
@@ -341,7 +341,7 @@ class PlanResolutionSuite extends AnalysisTest {
       provider = Some("parquet"))
 
     parseAndResolve(v1) match {
-      case CreateTable(tableDesc, _, None) =>
+      case CreateTableV1(tableDesc, _, None) =>
         assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
       case other =>
         fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
@@ -372,7 +372,7 @@ class PlanResolutionSuite extends AnalysisTest {
       provider = Some("parquet"))
 
     parseAndResolve(sql) match {
-      case CreateTable(tableDesc, _, None) =>
+      case CreateTableV1(tableDesc, _, None) =>
         assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
       case other =>
         fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
@@ -398,7 +398,7 @@ class PlanResolutionSuite extends AnalysisTest {
     )
 
     parseAndResolve(sql) match {
-      case CreateTable(tableDesc, _, None) =>
+      case CreateTableV1(tableDesc, _, None) =>
         assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
       case other =>
         fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
@@ -471,29 +471,20 @@ class PlanResolutionSuite extends AnalysisTest {
          |OPTIONS (path 's3://bucket/path/to/data', other 20)
       """.stripMargin
 
-    val expectedProperties = Map(
-      "p1" -> "v1",
-      "p2" -> "v2",
-      "option.other" -> "20",
-      "provider" -> "parquet",
-      "location" -> "s3://bucket/path/to/data",
-      "comment" -> "table comment",
-      "other" -> "20")
-
     parseAndResolve(sql) match {
-      case create: CreateV2Table =>
-        assert(create.catalog.name == "testcat")
-        assert(create.tableName == Identifier.of(Array("mydb"), "table_name"))
+      case create: CreateTable =>
+        assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == 
"testcat")
+        
assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+          "mydb.table_name")
         assert(create.tableSchema == new StructType()
             .add("id", LongType)
             .add("description", StringType)
             .add("point", new StructType().add("x", DoubleType).add("y", 
DoubleType)))
         assert(create.partitioning.isEmpty)
-        assert(create.properties == expectedProperties)
         assert(create.ignoreIfExists)
 
       case other =>
-        fail(s"Expected to parse ${classOf[CreateV2Table].getName} from 
query," +
+        fail(s"Expected to parse ${classOf[CreateTable].getName} from query," +
             s"got ${other.getClass.getName}: $sql")
     }
   }
@@ -511,29 +502,20 @@ class PlanResolutionSuite extends AnalysisTest {
          |OPTIONS (path 's3://bucket/path/to/data', other 20)
       """.stripMargin
 
-    val expectedProperties = Map(
-      "p1" -> "v1",
-      "p2" -> "v2",
-      "option.other" -> "20",
-      "provider" -> "parquet",
-      "location" -> "s3://bucket/path/to/data",
-      "comment" -> "table comment",
-      "other" -> "20")
-
     parseAndResolve(sql, withDefault = true) match {
-      case create: CreateV2Table =>
-        assert(create.catalog.name == "testcat")
-        assert(create.tableName == Identifier.of(Array("mydb"), "table_name"))
+      case create: CreateTable =>
+        assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == 
"testcat")
+        
assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+          "mydb.table_name")
         assert(create.tableSchema == new StructType()
             .add("id", LongType)
             .add("description", StringType)
             .add("point", new StructType().add("x", DoubleType).add("y", 
DoubleType)))
         assert(create.partitioning.isEmpty)
-        assert(create.properties == expectedProperties)
         assert(create.ignoreIfExists)
 
       case other =>
-        fail(s"Expected to parse ${classOf[CreateV2Table].getName} from 
query," +
+        fail(s"Expected to parse ${classOf[CreateTable].getName} from query," +
             s"got ${other.getClass.getName}: $sql")
     }
   }
@@ -551,27 +533,21 @@ class PlanResolutionSuite extends AnalysisTest {
          |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
       """.stripMargin
 
-    val expectedProperties = Map(
-      "p1" -> "v1",
-      "p2" -> "v2",
-      "provider" -> v2Format,
-      "location" -> "/user/external/page_view",
-      "comment" -> "This is the staging page view table")
-
     parseAndResolve(sql) match {
-      case create: CreateV2Table =>
-        assert(create.catalog.name == CatalogManager.SESSION_CATALOG_NAME)
-        assert(create.tableName == Identifier.of(Array("mydb"), "page_view"))
+      case create: CreateTable =>
+        assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name ==
+          CatalogManager.SESSION_CATALOG_NAME)
+        
assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+          "mydb.page_view")
         assert(create.tableSchema == new StructType()
             .add("id", LongType)
             .add("description", StringType)
             .add("point", new StructType().add("x", DoubleType).add("y", 
DoubleType)))
         assert(create.partitioning.isEmpty)
-        assert(create.properties == expectedProperties)
         assert(create.ignoreIfExists)
 
       case other =>
-        fail(s"Expected to parse ${classOf[CreateV2Table].getName} from 
query," +
+        fail(s"Expected to parse ${classOf[CreateTable].getName} from query," +
             s"got ${other.getClass.getName}: $sql")
     }
   }
@@ -1684,9 +1660,9 @@ class PlanResolutionSuite extends AnalysisTest {
      */
     def normalizePlan(plan: LogicalPlan): LogicalPlan = {
       plan match {
-        case CreateTable(tableDesc, mode, query) =>
+        case CreateTableV1(tableDesc, mode, query) =>
           val newTableDesc = tableDesc.copy(createTime = -1L)
-          CreateTable(newTableDesc, mode, query)
+          CreateTableV1(newTableDesc, mode, query)
         case _ => plan // Don't transform
       }
     }
@@ -1707,8 +1683,8 @@ class PlanResolutionSuite extends AnalysisTest {
         partitionColumnNames: Seq[String] = Seq.empty,
         comment: Option[String] = None,
         mode: SaveMode = SaveMode.ErrorIfExists,
-        query: Option[LogicalPlan] = None): CreateTable = {
-      CreateTable(
+        query: Option[LogicalPlan] = None): CreateTableV1 = {
+      CreateTableV1(
         CatalogTable(
           identifier = TableIdentifier(table, database),
           tableType = tableType,
@@ -1790,7 +1766,7 @@ class PlanResolutionSuite extends AnalysisTest {
     allSources.foreach { s =>
       val query = s"CREATE TABLE my_tab STORED AS $s"
       parseAndResolve(query) match {
-        case ct: CreateTable =>
+        case ct: CreateTableV1 =>
           val hiveSerde = HiveSerDe.sourceToSerDe(s)
           assert(hiveSerde.isDefined)
           assert(ct.tableDesc.storage.serde ==
@@ -1809,14 +1785,14 @@ class PlanResolutionSuite extends AnalysisTest {
 
     // No conflicting serdes here, OK
     parseAndResolve(query1) match {
-      case parsed1: CreateTable =>
+      case parsed1: CreateTableV1 =>
         assert(parsed1.tableDesc.storage.serde == Some("anything"))
         assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt"))
         assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt"))
     }
 
     parseAndResolve(query2) match {
-      case parsed2: CreateTable =>
+      case parsed2: CreateTableV1 =>
         assert(parsed2.tableDesc.storage.serde ==
             Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
         assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt"))
@@ -1832,7 +1808,7 @@ class PlanResolutionSuite extends AnalysisTest {
       val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS 
$s"
       if (supportedSources.contains(s)) {
         parseAndResolve(query) match {
-          case ct: CreateTable =>
+          case ct: CreateTableV1 =>
             val hiveSerde = HiveSerDe.sourceToSerDe(s)
             assert(hiveSerde.isDefined)
             assert(ct.tableDesc.storage.serde == Some("anything"))
@@ -1853,7 +1829,7 @@ class PlanResolutionSuite extends AnalysisTest {
       val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED 
BY ' ' STORED AS $s"
       if (supportedSources.contains(s)) {
         parseAndResolve(query) match {
-          case ct: CreateTable =>
+          case ct: CreateTableV1 =>
             val hiveSerde = HiveSerDe.sourceToSerDe(s)
             assert(hiveSerde.isDefined)
             assert(ct.tableDesc.storage.serde == hiveSerde.get.serde
@@ -1870,14 +1846,14 @@ class PlanResolutionSuite extends AnalysisTest {
   test("create hive external table") {
     val withoutLoc = "CREATE EXTERNAL TABLE my_tab STORED AS parquet"
     parseAndResolve(withoutLoc) match {
-      case ct: CreateTable =>
+      case ct: CreateTableV1 =>
         assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
         assert(ct.tableDesc.storage.locationUri.isEmpty)
     }
 
     val withLoc = "CREATE EXTERNAL TABLE my_tab STORED AS parquet LOCATION 
'/something/anything'"
     parseAndResolve(withLoc) match {
-      case ct: CreateTable =>
+      case ct: CreateTableV1 =>
         assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
         assert(ct.tableDesc.storage.locationUri == Some(new 
URI("/something/anything")))
     }
@@ -1897,7 +1873,7 @@ class PlanResolutionSuite extends AnalysisTest {
   test("create hive table - location implies external") {
     val query = "CREATE TABLE my_tab STORED AS parquet LOCATION 
'/something/anything'"
     parseAndResolve(query) match {
-      case ct: CreateTable =>
+      case ct: CreateTableV1 =>
         assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
         assert(ct.tableDesc.storage.locationUri == Some(new 
URI("/something/anything")))
     }
@@ -2261,14 +2237,6 @@ class PlanResolutionSuite extends AnalysisTest {
     assert(e2.getMessage.contains("Operation not allowed"))
   }
 
-  test("create table - properties") {
-    val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES 
('k1'='v1', 'k2'='v2')"
-    parsePlan(query) match {
-      case state: CreateTableStatement =>
-        assert(state.properties == Map("k1" -> "v1", "k2" -> "v2"))
-    }
-  }
-
   test("create table(hive) - everything!") {
     val query =
       """

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to