This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 34e3cc7  [SPARK-27108][SQL] Add parsed SQL plans for create, CTAS.
34e3cc7 is described below

commit 34e3cc70602b5107ebeea3f99c7c41672107ca13
Author: Ryan Blue <b...@apache.org>
AuthorDate: Fri Mar 22 13:58:54 2019 -0700

    [SPARK-27108][SQL] Add parsed SQL plans for create, CTAS.
    
    ## What changes were proposed in this pull request?
    
    This moves parsing `CREATE TABLE ... USING` statements into catalyst. 
Catalyst produces logical plans with the parsed information and those plans are 
converted to v1 `DataSource` plans in `DataSourceAnalysis`.
    
    This prepares for adding v2 create plans that should receive the 
information parsed from SQL without being translated to v1 plans first.
    
    This also makes it possible to parse in catalyst instead of breaking the 
parser across the abstract `AstBuilder` in catalyst and `SparkSqlParser` in 
core.
    
    For more information, see the [mailing list 
thread](https://lists.apache.org/thread.html/54f4e1929ceb9a2b0cac7cb058000feb8de5d6c667b2e0950804c613%3Cdev.spark.apache.org%3E).
    
    ## How was this patch tested?
    
    This uses existing tests to catch regressions. This introduces no behavior 
changes.
    
    Closes #24029 from rdblue/SPARK-27108-add-parsed-create-logical-plans.
    
    Authored-by: Ryan Blue <b...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 192 ++++++++++++-
 .../plans/logical/sql/CreateTableStatement.scala   |  66 +++++
 .../plans/logical/sql/ParsedStatement.scala        |  44 +++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 318 +++++++++++++++++++++
 .../spark/sql/execution/SparkSqlParser.scala       | 233 ++-------------
 .../datasources/DataSourceResolution.scala         | 112 ++++++++
 .../sql/internal/BaseSessionStateBuilder.scala     |   1 +
 .../spark/sql/execution/SparkSqlParserSuite.scala  |  13 -
 .../sql/execution/command/DDLParserSuite.scala     | 251 +---------------
 .../execution/command/PlanResolutionSuite.scala    | 257 +++++++++++++++++
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |   1 +
 11 files changed, 1030 insertions(+), 458 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 38a61b8..52a5d2c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -30,12 +30,13 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
+import 
org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, 
CreateTableStatement}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, 
stringToDate, stringToTimestamp}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -1888,4 +1889,193 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
     val structField = StructField(identifier.getText, typedVisit(dataType), 
nullable = true)
     if (STRING == null) structField else 
structField.withComment(string(STRING))
   }
+
+  /**
+   * Create location string.
+   */
+  override def visitLocationSpec(ctx: LocationSpecContext): String = 
withOrigin(ctx) {
+    string(ctx.STRING)
+  }
+
+  /**
+   * Create a [[BucketSpec]].
+   */
+  override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = 
withOrigin(ctx) {
+    BucketSpec(
+      ctx.INTEGER_VALUE.getText.toInt,
+      visitIdentifierList(ctx.identifierList),
+      Option(ctx.orderedIdentifierList)
+          .toSeq
+          .flatMap(_.orderedIdentifier.asScala)
+          .map { orderedIdCtx =>
+            Option(orderedIdCtx.ordering).map(_.getText).foreach { dir =>
+              if (dir.toLowerCase(Locale.ROOT) != "asc") {
+                operationNotAllowed(s"Column ordering must be ASC, was 
'$dir'", ctx)
+              }
+            }
+
+            orderedIdCtx.identifier.getText
+          })
+  }
+
+  /**
+   * Convert a table property list into a key-value map.
+   * This should be called through [[visitPropertyKeyValues]] or 
[[visitPropertyKeys]].
+   */
+  override def visitTablePropertyList(
+      ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
+    val properties = ctx.tableProperty.asScala.map { property =>
+      val key = visitTablePropertyKey(property.key)
+      val value = visitTablePropertyValue(property.value)
+      key -> value
+    }
+    // Check for duplicate property names.
+    checkDuplicateKeys(properties, ctx)
+    properties.toMap
+  }
+
+  /**
+   * Parse a key-value map from a [[TablePropertyListContext]], assuming all 
values are specified.
+   */
+  def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, 
String] = {
+    val props = visitTablePropertyList(ctx)
+    val badKeys = props.collect { case (key, null) => key }
+    if (badKeys.nonEmpty) {
+      operationNotAllowed(
+        s"Values must be specified for key(s): ${badKeys.mkString("[", ",", 
"]")}", ctx)
+    }
+    props
+  }
+
+  /**
+   * Parse a list of keys from a [[TablePropertyListContext]], assuming no 
values are specified.
+   */
+  def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = {
+    val props = visitTablePropertyList(ctx)
+    val badKeys = props.filter { case (_, v) => v != null }.keys
+    if (badKeys.nonEmpty) {
+      operationNotAllowed(
+        s"Values should not be specified for key(s): ${badKeys.mkString("[", 
",", "]")}", ctx)
+    }
+    props.keys.toSeq
+  }
+
+  /**
+   * A table property key can either be String or a collection of dot 
separated elements. This
+   * function extracts the property key based on whether its a string literal 
or a table property
+   * identifier.
+   */
+  override def visitTablePropertyKey(key: TablePropertyKeyContext): String = {
+    if (key.STRING != null) {
+      string(key.STRING)
+    } else {
+      key.getText
+    }
+  }
+
+  /**
+   * A table property value can be String, Integer, Boolean or Decimal. This 
function extracts
+   * the property value based on whether its a string, integer, boolean or 
decimal literal.
+   */
+  override def visitTablePropertyValue(value: TablePropertyValueContext): 
String = {
+    if (value == null) {
+      null
+    } else if (value.STRING != null) {
+      string(value.STRING)
+    } else if (value.booleanValue != null) {
+      value.getText.toLowerCase(Locale.ROOT)
+    } else {
+      value.getText
+    }
+  }
+
+  /**
+   * Type to keep track of a table header: (identifier, isTemporary, 
ifNotExists, isExternal).
+   */
+  type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean)
+
+  /**
+   * Validate a create table statement and return the [[TableIdentifier]].
+   */
+  override def visitCreateTableHeader(
+      ctx: CreateTableHeaderContext): TableHeader = withOrigin(ctx) {
+    val temporary = ctx.TEMPORARY != null
+    val ifNotExists = ctx.EXISTS != null
+    if (temporary && ifNotExists) {
+      operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx)
+    }
+    (visitTableIdentifier(ctx.tableIdentifier), temporary, ifNotExists, 
ctx.EXTERNAL != null)
+  }
+
+  /**
+   * Create a table, returning a [[CreateTableStatement]] logical plan.
+   *
+   * Expected format:
+   * {{{
+   *   CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
+   *   USING table_provider
+   *   create_table_clauses
+   *   [[AS] select_statement];
+   *
+   *   create_table_clauses (order insensitive):
+   *     [OPTIONS table_property_list]
+   *     [PARTITIONED BY (col_name, col_name, ...)]
+   *     [CLUSTERED BY (col_name, col_name, ...)
+   *       [SORTED BY (col_name [ASC|DESC], ...)]
+   *       INTO num_buckets BUCKETS
+   *     ]
+   *     [LOCATION path]
+   *     [COMMENT table_comment]
+   *     [TBLPROPERTIES (property_name=property_value, ...)]
+   * }}}
+   */
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = 
withOrigin(ctx) {
+    val (table, temp, ifNotExists, external) = 
visitCreateTableHeader(ctx.createTableHeader)
+    if (external) {
+      operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
+    }
+
+    checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
+    checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
+    checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
+    checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
+    checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
+    checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
+
+    val schema = Option(ctx.colTypeList()).map(createSchema)
+    val partitionCols: Seq[String] =
+      Option(ctx.partitionColumnNames).map(visitIdentifierList).getOrElse(Nil)
+    val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
+    val properties = 
Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
+    val options = 
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+
+    val provider = ctx.tableProvider.qualifiedName.getText
+    val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
+    val comment = Option(ctx.comment).map(string)
+
+    Option(ctx.query).map(plan) match {
+      case Some(_) if temp =>
+        operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", 
ctx)
+
+      case Some(_) if schema.isDefined =>
+        operationNotAllowed(
+          "Schema may not be specified in a Create Table As Select (CTAS) 
statement",
+          ctx)
+
+      case Some(query) =>
+        CreateTableAsSelectStatement(
+          table, query, partitionCols, bucketSpec, properties, provider, 
options, location, comment,
+          ifNotExists = ifNotExists)
+
+      case None if temp =>
+        // CREATE TEMPORARY TABLE ... USING ... is not supported by the 
catalyst parser.
+        // Use CREATE TEMPORARY VIEW ... USING ... instead.
+        operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
+
+      case _ =>
+        CreateTableStatement(table, schema.getOrElse(new StructType), 
partitionCols, bucketSpec,
+          properties, provider, options, location, comment, ifNotExists = 
ifNotExists)
+    }
+  }
+
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
new file mode 100644
index 0000000..c734968
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/CreateTableStatement.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A CREATE TABLE command, as parsed from SQL.
+ *
+ * This is a metadata-only command and is not used to write data to the 
created table.
+ */
+case class CreateTableStatement(
+    table: TableIdentifier,
+    tableSchema: StructType,
+    partitioning: Seq[String],
+    bucketSpec: Option[BucketSpec],
+    properties: Map[String, String],
+    provider: String,
+    options: Map[String, String],
+    location: Option[String],
+    comment: Option[String],
+    ifNotExists: Boolean) extends ParsedStatement {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override def children: Seq[LogicalPlan] = Seq.empty
+}
+
+/**
+ * A CREATE TABLE AS SELECT command, as parsed from SQL.
+ */
+case class CreateTableAsSelectStatement(
+    table: TableIdentifier,
+    asSelect: LogicalPlan,
+    partitioning: Seq[String],
+    bucketSpec: Option[BucketSpec],
+    properties: Map[String, String],
+    provider: String,
+    options: Map[String, String],
+    location: Option[String],
+    comment: Option[String],
+    ifNotExists: Boolean) extends ParsedStatement {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override def children: Seq[LogicalPlan] = Seq(asSelect)
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala
new file mode 100644
index 0000000..510f2a1
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.sql
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+ * A logical plan node that contains exactly what was parsed from SQL.
+ *
+ * This is used to hold information parsed from SQL when there are multiple 
implementations of a
+ * query or command. For example, CREATE TABLE may be implemented by different 
nodes for v1 and v2.
+ * Instead of parsing directly to a v1 CreateTable that keeps metadata in 
CatalogTable, and then
+ * converting that v1 metadata to the v2 equivalent, the sql 
[[CreateTableStatement]] plan is
+ * produced by the parser and converted once into both implementations.
+ *
+ * Parsed logical plans are not resolved because they must be converted to 
concrete logical plans.
+ *
+ * Parsed logical plans are located in Catalyst so that as much SQL parsing 
logic as possible is be
+ * kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]].
+ */
+private[sql] abstract class ParsedStatement extends LogicalPlan {
+  // Redact properties and options when parsed nodes are used by generic 
methods like toString
+  override def productIterator: Iterator[Any] = super.productIterator.map {
+    case mapArg: Map[_, _] => 
conf.redactOptions(mapArg.asInstanceOf[Map[String, String]])
+    case other => other
+  }
+
+  final override lazy val resolved = false
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
new file mode 100644
index 0000000..dae8f58
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.AnalysisTest
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import 
org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, 
CreateTableStatement}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+class DDLParserSuite extends AnalysisTest {
+  import CatalystSqlParser._
+
+  private def intercept(sqlCommand: String, messages: String*): Unit = {
+    val e = intercept[ParseException](parsePlan(sqlCommand))
+    messages.foreach { message =>
+      assert(e.message.contains(message))
+    }
+  }
+
+  test("create table using - schema") {
+    val sql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING 
parquet"
+
+    parsePlan(sql) match {
+      case create: CreateTableStatement =>
+        assert(create.table == TableIdentifier("my_tab"))
+        assert(create.tableSchema == new StructType()
+            .add("a", IntegerType, nullable = true, "test")
+            .add("b", StringType))
+        assert(create.partitioning.isEmpty)
+        assert(create.bucketSpec.isEmpty)
+        assert(create.properties.isEmpty)
+        assert(create.provider == "parquet")
+        assert(create.options.isEmpty)
+        assert(create.location.isEmpty)
+        assert(create.comment.isEmpty)
+        assert(!create.ifNotExists)
+
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableStatement].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+
+    intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING 
parquet",
+      "no viable alternative at input")
+  }
+
+  test("create table - with IF NOT EXISTS") {
+    val sql = "CREATE TABLE IF NOT EXISTS my_tab(a INT, b STRING) USING 
parquet"
+
+    parsePlan(sql) match {
+      case create: CreateTableStatement =>
+        assert(create.table == TableIdentifier("my_tab"))
+        assert(create.tableSchema == new StructType().add("a", 
IntegerType).add("b", StringType))
+        assert(create.partitioning.isEmpty)
+        assert(create.bucketSpec.isEmpty)
+        assert(create.properties.isEmpty)
+        assert(create.provider == "parquet")
+        assert(create.options.isEmpty)
+        assert(create.location.isEmpty)
+        assert(create.comment.isEmpty)
+        assert(create.ifNotExists)
+
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableStatement].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("create table - with partitioned by") {
+    val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
+        "USING parquet PARTITIONED BY (a)"
+
+    parsePlan(query) match {
+      case create: CreateTableStatement =>
+        assert(create.table == TableIdentifier("my_tab"))
+        assert(create.tableSchema == new StructType()
+            .add("a", IntegerType, nullable = true, "test")
+            .add("b", StringType))
+        assert(create.partitioning == Seq("a"))
+        assert(create.bucketSpec.isEmpty)
+        assert(create.properties.isEmpty)
+        assert(create.provider == "parquet")
+        assert(create.options.isEmpty)
+        assert(create.location.isEmpty)
+        assert(create.comment.isEmpty)
+        assert(!create.ifNotExists)
+
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableStatement].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $query")
+    }
+  }
+
+  test("create table - with bucket") {
+    val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
+        "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
+
+    parsePlan(query) match {
+      case create: CreateTableStatement =>
+        assert(create.table == TableIdentifier("my_tab"))
+        assert(create.tableSchema == new StructType().add("a", 
IntegerType).add("b", StringType))
+        assert(create.partitioning.isEmpty)
+        assert(create.bucketSpec.contains(BucketSpec(5, Seq("a"), Seq("b"))))
+        assert(create.properties.isEmpty)
+        assert(create.provider == "parquet")
+        assert(create.options.isEmpty)
+        assert(create.location.isEmpty)
+        assert(create.comment.isEmpty)
+        assert(!create.ifNotExists)
+
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableStatement].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $query")
+    }
+  }
+
+  test("create table - with comment") {
+    val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 
'abc'"
+
+    parsePlan(sql) match {
+      case create: CreateTableStatement =>
+        assert(create.table == TableIdentifier("my_tab"))
+        assert(create.tableSchema == new StructType().add("a", 
IntegerType).add("b", StringType))
+        assert(create.partitioning.isEmpty)
+        assert(create.bucketSpec.isEmpty)
+        assert(create.properties.isEmpty)
+        assert(create.provider == "parquet")
+        assert(create.options.isEmpty)
+        assert(create.location.isEmpty)
+        assert(create.comment.contains("abc"))
+        assert(!create.ifNotExists)
+
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableStatement].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("create table - with table properties") {
+    val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet 
TBLPROPERTIES('test' = 'test')"
+
+    parsePlan(sql) match {
+      case create: CreateTableStatement =>
+        assert(create.table == TableIdentifier("my_tab"))
+        assert(create.tableSchema == new StructType().add("a", 
IntegerType).add("b", StringType))
+        assert(create.partitioning.isEmpty)
+        assert(create.bucketSpec.isEmpty)
+        assert(create.properties == Map("test" -> "test"))
+        assert(create.provider == "parquet")
+        assert(create.options.isEmpty)
+        assert(create.location.isEmpty)
+        assert(create.comment.isEmpty)
+        assert(!create.ifNotExists)
+
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableStatement].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("create table - with location") {
+    val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION 
'/tmp/file'"
+
+    parsePlan(sql) match {
+      case create: CreateTableStatement =>
+        assert(create.table == TableIdentifier("my_tab"))
+        assert(create.tableSchema == new StructType().add("a", 
IntegerType).add("b", StringType))
+        assert(create.partitioning.isEmpty)
+        assert(create.bucketSpec.isEmpty)
+        assert(create.properties.isEmpty)
+        assert(create.provider == "parquet")
+        assert(create.options.isEmpty)
+        assert(create.location.contains("/tmp/file"))
+        assert(create.comment.isEmpty)
+        assert(!create.ifNotExists)
+
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableStatement].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("create table - byte length literal table name") {
+    val sql = "CREATE TABLE 1m.2g(a INT) USING parquet"
+
+    parsePlan(sql) match {
+      case create: CreateTableStatement =>
+        assert(create.table == TableIdentifier("2g", Some("1m")))
+        assert(create.tableSchema == new StructType().add("a", IntegerType))
+        assert(create.partitioning.isEmpty)
+        assert(create.bucketSpec.isEmpty)
+        assert(create.properties.isEmpty)
+        assert(create.provider == "parquet")
+        assert(create.options.isEmpty)
+        assert(create.location.isEmpty)
+        assert(create.comment.isEmpty)
+        assert(!create.ifNotExists)
+
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableStatement].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("Duplicate clauses - create table") {
+    def createTableHeader(duplicateClause: String): String = {
+      s"CREATE TABLE my_tab(a INT, b STRING) USING parquet $duplicateClause 
$duplicateClause"
+    }
+
+    intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')"),
+      "Found duplicate clauses: TBLPROPERTIES")
+    intercept(createTableHeader("LOCATION '/tmp/file'"),
+      "Found duplicate clauses: LOCATION")
+    intercept(createTableHeader("COMMENT 'a table'"),
+      "Found duplicate clauses: COMMENT")
+    intercept(createTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS"),
+      "Found duplicate clauses: CLUSTERED BY")
+    intercept(createTableHeader("PARTITIONED BY (b)"),
+      "Found duplicate clauses: PARTITIONED BY")
+  }
+
+  test("support for other types in OPTIONS") {
+    val sql =
+      """
+        |CREATE TABLE table_name USING json
+        |OPTIONS (a 1, b 0.1, c TRUE)
+      """.stripMargin
+
+    parsePlan(sql) match {
+      case create: CreateTableStatement =>
+        assert(create.table == TableIdentifier("table_name"))
+        assert(create.tableSchema == new StructType)
+        assert(create.partitioning.isEmpty)
+        assert(create.bucketSpec.isEmpty)
+        assert(create.properties.isEmpty)
+        assert(create.provider == "json")
+        assert(create.options == Map("a" -> "1", "b" -> "0.1", "c" -> "true"))
+        assert(create.location.isEmpty)
+        assert(create.comment.isEmpty)
+        assert(!create.ifNotExists)
+
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableStatement].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("Test CTAS against native tables") {
+    val s1 =
+      """
+        |CREATE TABLE IF NOT EXISTS mydb.page_view
+        |USING parquet
+        |COMMENT 'This is the staging page view table'
+        |LOCATION '/user/external/page_view'
+        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+        |AS SELECT * FROM src
+      """.stripMargin
+
+    val s2 =
+      """
+        |CREATE TABLE IF NOT EXISTS mydb.page_view
+        |USING parquet
+        |LOCATION '/user/external/page_view'
+        |COMMENT 'This is the staging page view table'
+        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+        |AS SELECT * FROM src
+      """.stripMargin
+
+    val s3 =
+      """
+        |CREATE TABLE IF NOT EXISTS mydb.page_view
+        |USING parquet
+        |COMMENT 'This is the staging page view table'
+        |LOCATION '/user/external/page_view'
+        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+        |AS SELECT * FROM src
+      """.stripMargin
+
+    checkParsing(s1)
+    checkParsing(s2)
+    checkParsing(s3)
+
+    def checkParsing(sql: String): Unit = {
+      parsePlan(sql) match {
+        case create: CreateTableAsSelectStatement =>
+          assert(create.table == TableIdentifier("page_view", Some("mydb")))
+          assert(create.partitioning.isEmpty)
+          assert(create.bucketSpec.isEmpty)
+          assert(create.properties == Map("p1" -> "v1", "p2" -> "v2"))
+          assert(create.provider == "parquet")
+          assert(create.options.isEmpty)
+          assert(create.location.contains("/user/external/page_view"))
+          assert(create.comment.contains("This is the staging page view 
table"))
+          assert(create.ifNotExists)
+
+        case other =>
+          fail(s"Expected to parse 
${classOf[CreateTableAsSelectStatement].getClass.getName} " +
+              s"from query, got ${other.getClass.getName}: $sql")
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index a9a5e6ec..735c0a5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -25,7 +25,7 @@ import org.antlr.v4.runtime.{ParserRuleContext, Token}
 import org.antlr.v4.runtime.tree.TerminalNode
 
 import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser._
@@ -377,127 +377,45 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   }
 
   /**
-   * Type to keep track of a table header: (identifier, isTemporary, 
ifNotExists, isExternal).
-   */
-  type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean)
-
-  /**
-   * Validate a create table statement and return the [[TableIdentifier]].
-   */
-  override def visitCreateTableHeader(
-      ctx: CreateTableHeaderContext): TableHeader = withOrigin(ctx) {
-    val temporary = ctx.TEMPORARY != null
-    val ifNotExists = ctx.EXISTS != null
-    if (temporary && ifNotExists) {
-      operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx)
-    }
-    (visitTableIdentifier(ctx.tableIdentifier), temporary, ifNotExists, 
ctx.EXTERNAL != null)
-  }
-
-  /**
    * Create a table, returning a [[CreateTable]] logical plan.
    *
-   * Expected format:
-   * {{{
-   *   CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
-   *   USING table_provider
-   *   create_table_clauses
-   *   [[AS] select_statement];
+   * This is used to produce CreateTempViewUsing from CREATE TEMPORARY TABLE.
    *
-   *   create_table_clauses (order insensitive):
-   *     [OPTIONS table_property_list]
-   *     [PARTITIONED BY (col_name, col_name, ...)]
-   *     [CLUSTERED BY (col_name, col_name, ...)
-   *       [SORTED BY (col_name [ASC|DESC], ...)]
-   *       INTO num_buckets BUCKETS
-   *     ]
-   *     [LOCATION path]
-   *     [COMMENT table_comment]
-   *     [TBLPROPERTIES (property_name=property_value, ...)]
-   * }}}
+   * TODO: Remove this. It is used because CreateTempViewUsing is not a 
Catalyst plan.
+   * Either move CreateTempViewUsing into catalyst as a parsed logical plan, 
or remove it because
+   * it is deprecated.
    */
   override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = 
withOrigin(ctx) {
     val (table, temp, ifNotExists, external) = 
visitCreateTableHeader(ctx.createTableHeader)
-    if (external) {
-      operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
-    }
-
-    checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
-    checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
-    checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
-    checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
-    checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
-    checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
-
-    val options = 
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
-    val provider = ctx.tableProvider.qualifiedName.getText
-    val schema = Option(ctx.colTypeList()).map(createSchema)
-    val partitionColumnNames =
-      Option(ctx.partitionColumnNames)
-        .map(visitIdentifierList(_).toArray)
-        .getOrElse(Array.empty[String])
-    val properties = 
Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
-    val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
-
-    val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
-    val storage = DataSource.buildStorageFormatFromOptions(options)
 
-    if (location.isDefined && storage.locationUri.isDefined) {
-      throw new ParseException(
-        "LOCATION and 'path' in OPTIONS are both used to indicate the custom 
table path, " +
-          "you can only specify one of them.", ctx)
-    }
-    val customLocation = 
storage.locationUri.orElse(location.map(CatalogUtils.stringToURI))
-
-    val tableType = if (customLocation.isDefined) {
-      CatalogTableType.EXTERNAL
+    if (!temp || ctx.query != null) {
+      super.visitCreateTable(ctx)
     } else {
-      CatalogTableType.MANAGED
-    }
-
-    val tableDesc = CatalogTable(
-      identifier = table,
-      tableType = tableType,
-      storage = storage.copy(locationUri = customLocation),
-      schema = schema.getOrElse(new StructType),
-      provider = Some(provider),
-      partitionColumnNames = partitionColumnNames,
-      bucketSpec = bucketSpec,
-      properties = properties,
-      comment = Option(ctx.comment).map(string))
-
-    // Determine the storage mode.
-    val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
-
-    if (ctx.query != null) {
-      // Get the backing query.
-      val query = plan(ctx.query)
-
-      if (temp) {
-        operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", 
ctx)
+      if (external) {
+        operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
       }
 
-      // Don't allow explicit specification of schema for CTAS
-      if (schema.nonEmpty) {
-        operationNotAllowed(
-          "Schema may not be specified in a Create Table As Select (CTAS) 
statement",
-          ctx)
-      }
-      CreateTable(tableDesc, mode, Some(query))
-    } else {
-      if (temp) {
-        if (ifNotExists) {
-          operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
-        }
+      checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
+      checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
+      checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
+      checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
+      checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
+      checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
 
-        logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, 
please use " +
-          "CREATE TEMPORARY VIEW ... USING ... instead")
+      if (ifNotExists) {
         // Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING 
does not support
         // IF NOT EXISTS. Users are not allowed to replace the existing temp 
table.
-        CreateTempViewUsing(table, schema, replace = false, global = false, 
provider, options)
-      } else {
-        CreateTable(tableDesc, mode, None)
+        operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
       }
+
+      val options = 
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+      val provider = ctx.tableProvider.qualifiedName.getText
+      val schema = Option(ctx.colTypeList()).map(createSchema)
+
+      logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please 
use " +
+          "CREATE TEMPORARY VIEW ... USING ... instead")
+
+      CreateTempViewUsing(table, schema, replace = false, global = false, 
provider, options)
     }
   }
 
@@ -563,77 +481,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   }
 
   /**
-   * Convert a table property list into a key-value map.
-   * This should be called through [[visitPropertyKeyValues]] or 
[[visitPropertyKeys]].
-   */
-  override def visitTablePropertyList(
-      ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
-    val properties = ctx.tableProperty.asScala.map { property =>
-      val key = visitTablePropertyKey(property.key)
-      val value = visitTablePropertyValue(property.value)
-      key -> value
-    }
-    // Check for duplicate property names.
-    checkDuplicateKeys(properties, ctx)
-    properties.toMap
-  }
-
-  /**
-   * Parse a key-value map from a [[TablePropertyListContext]], assuming all 
values are specified.
-   */
-  private def visitPropertyKeyValues(ctx: TablePropertyListContext): 
Map[String, String] = {
-    val props = visitTablePropertyList(ctx)
-    val badKeys = props.collect { case (key, null) => key }
-    if (badKeys.nonEmpty) {
-      operationNotAllowed(
-        s"Values must be specified for key(s): ${badKeys.mkString("[", ",", 
"]")}", ctx)
-    }
-    props
-  }
-
-  /**
-   * Parse a list of keys from a [[TablePropertyListContext]], assuming no 
values are specified.
-   */
-  private def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = {
-    val props = visitTablePropertyList(ctx)
-    val badKeys = props.filter { case (_, v) => v != null }.keys
-    if (badKeys.nonEmpty) {
-      operationNotAllowed(
-        s"Values should not be specified for key(s): ${badKeys.mkString("[", 
",", "]")}", ctx)
-    }
-    props.keys.toSeq
-  }
-
-  /**
-   * A table property key can either be String or a collection of dot 
separated elements. This
-   * function extracts the property key based on whether its a string literal 
or a table property
-   * identifier.
-   */
-  override def visitTablePropertyKey(key: TablePropertyKeyContext): String = {
-    if (key.STRING != null) {
-      string(key.STRING)
-    } else {
-      key.getText
-    }
-  }
-
-  /**
-   * A table property value can be String, Integer, Boolean or Decimal. This 
function extracts
-   * the property value based on whether its a string, integer, boolean or 
decimal literal.
-   */
-  override def visitTablePropertyValue(value: TablePropertyValueContext): 
String = {
-    if (value == null) {
-      null
-    } else if (value.STRING != null) {
-      string(value.STRING)
-    } else if (value.booleanValue != null) {
-      value.getText.toLowerCase(Locale.ROOT)
-    } else {
-      value.getText
-    }
-  }
-
-  /**
    * Create a [[CreateDatabaseCommand]] command.
    *
    * For example:
@@ -1007,34 +854,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   }
 
   /**
-   * Create location string.
-   */
-  override def visitLocationSpec(ctx: LocationSpecContext): String = 
withOrigin(ctx) {
-    string(ctx.STRING)
-  }
-
-  /**
-   * Create a [[BucketSpec]].
-   */
-  override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = 
withOrigin(ctx) {
-    BucketSpec(
-      ctx.INTEGER_VALUE.getText.toInt,
-      visitIdentifierList(ctx.identifierList),
-      Option(ctx.orderedIdentifierList)
-        .toSeq
-        .flatMap(_.orderedIdentifier.asScala)
-        .map { orderedIdCtx =>
-          Option(orderedIdCtx.ordering).map(_.getText).foreach { dir =>
-            if (dir.toLowerCase(Locale.ROOT) != "asc") {
-              operationNotAllowed(s"Column ordering must be ASC, was '$dir'", 
ctx)
-            }
-          }
-
-          orderedIdCtx.identifier.getText
-        })
-  }
-
-  /**
    * Convert a nested constants list into a sequence of string sequences.
    */
   override def visitNestedConstantList(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
new file mode 100644
index 0000000..9fd44ea
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.Locale
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.CastSupport
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import 
org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, 
CreateTableStatement}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.TableProvider
+import org.apache.spark.sql.types.StructType
+
+case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with 
CastSupport  {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case CreateTableStatement(
+        table, schema, partitionCols, bucketSpec, properties, 
V1WriteProvider(provider), options,
+        location, comment, ifNotExists) =>
+
+      val tableDesc = buildCatalogTable(table, schema, partitionCols, 
bucketSpec, properties,
+        provider, options, location, comment, ifNotExists)
+      val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
+
+      CreateTable(tableDesc, mode, None)
+
+    case CreateTableAsSelectStatement(
+        table, query, partitionCols, bucketSpec, properties, 
V1WriteProvider(provider), options,
+        location, comment, ifNotExists) =>
+
+      val tableDesc = buildCatalogTable(table, new StructType, partitionCols, 
bucketSpec,
+        properties, provider, options, location, comment, ifNotExists)
+      val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
+
+      CreateTable(tableDesc, mode, Some(query))
+  }
+
+  object V1WriteProvider {
+    private val v1WriteOverrideSet =
+      conf.userV1SourceWriterList.toLowerCase(Locale.ROOT).split(",").toSet
+
+    def unapply(provider: String): Option[String] = {
+      if (v1WriteOverrideSet.contains(provider.toLowerCase(Locale.ROOT))) {
+        Some(provider)
+      } else {
+        lazy val providerClass = DataSource.lookupDataSource(provider, conf)
+        provider match {
+          case _ if classOf[TableProvider].isAssignableFrom(providerClass) =>
+            None
+          case _ =>
+            Some(provider)
+        }
+      }
+    }
+  }
+
+  private def buildCatalogTable(
+      table: TableIdentifier,
+      schema: StructType,
+      partitionColumnNames: Seq[String],
+      bucketSpec: Option[BucketSpec],
+      properties: Map[String, String],
+      provider: String,
+      options: Map[String, String],
+      location: Option[String],
+      comment: Option[String],
+      ifNotExists: Boolean): CatalogTable = {
+
+    val storage = DataSource.buildStorageFormatFromOptions(options)
+    if (location.isDefined && storage.locationUri.isDefined) {
+      throw new AnalysisException(
+        "LOCATION and 'path' in OPTIONS are both used to indicate the custom 
table path, " +
+            "you can only specify one of them.")
+    }
+    val customLocation = 
storage.locationUri.orElse(location.map(CatalogUtils.stringToURI))
+
+    val tableType = if (customLocation.isDefined) {
+      CatalogTableType.EXTERNAL
+    } else {
+      CatalogTableType.MANAGED
+    }
+
+    CatalogTable(
+      identifier = table,
+      tableType = tableType,
+      storage = storage.copy(locationUri = customLocation),
+      schema = schema,
+      provider = Some(provider),
+      partitionColumnNames = partitionColumnNames,
+      bucketSpec = bucketSpec,
+      properties = properties,
+      comment = comment)
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index f05aa51..d5543e8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -161,6 +161,7 @@ abstract class BaseSessionStateBuilder(
       new FindDataSourceTable(session) +:
         new ResolveSQLOnFile(session) +:
         new FallbackOrcDataSourceV2(session) +:
+        DataSourceResolution(conf) +:
         customResolutionRules
 
     override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 425a96b..be3d079 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -215,19 +215,6 @@ class SparkSqlParserSuite extends AnalysisTest {
       "no viable alternative at input")
   }
 
-  test("create table using - schema") {
-    assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING 
parquet",
-      createTableUsing(
-        table = "my_tab",
-        schema = (new StructType)
-          .add("a", IntegerType, nullable = true, "test")
-          .add("b", StringType)
-      )
-    )
-    intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING 
parquet",
-      "no viable alternative at input")
-  }
-
   test("create view as insert into table") {
     // Single insert query
     intercept("CREATE VIEW testView AS INSERT INTO jt VALUES(1, 1)",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index e0ccae1..d430eeb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -415,173 +415,28 @@ class DDLParserSuite extends PlanTest with 
SharedSQLContext {
     assert(ct.tableDesc.storage.locationUri == Some(new 
URI("/something/anything")))
   }
 
-  test("create table - with partitioned by") {
-    val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
-      "USING parquet PARTITIONED BY (a)"
-
-    val expectedTableDesc = CatalogTable(
-      identifier = TableIdentifier("my_tab"),
-      tableType = CatalogTableType.MANAGED,
-      storage = CatalogStorageFormat.empty,
-      schema = new StructType()
-        .add("a", IntegerType, nullable = true, "test")
-        .add("b", StringType),
-      provider = Some("parquet"),
-      partitionColumnNames = Seq("a")
-    )
-
-    parser.parsePlan(query) match {
-      case CreateTable(tableDesc, _, None) =>
-        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
-      case other =>
-        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
-          s"got ${other.getClass.getName}: $query")
-    }
-  }
-
-  test("create table - with bucket") {
-    val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
-      "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
-
-    val expectedTableDesc = CatalogTable(
-      identifier = TableIdentifier("my_tab"),
-      tableType = CatalogTableType.MANAGED,
-      storage = CatalogStorageFormat.empty,
-      schema = new StructType().add("a", IntegerType).add("b", StringType),
-      provider = Some("parquet"),
-      bucketSpec = Some(BucketSpec(5, Seq("a"), Seq("b")))
-    )
-
-    parser.parsePlan(query) match {
-      case CreateTable(tableDesc, _, None) =>
-        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
-      case other =>
-        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
-          s"got ${other.getClass.getName}: $query")
-    }
-  }
-
-  test("create table - with comment") {
-    val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 
'abc'"
-
-    val expectedTableDesc = CatalogTable(
-      identifier = TableIdentifier("my_tab"),
-      tableType = CatalogTableType.MANAGED,
-      storage = CatalogStorageFormat.empty,
-      schema = new StructType().add("a", IntegerType).add("b", StringType),
-      provider = Some("parquet"),
-      comment = Some("abc"))
-
-    parser.parsePlan(sql) match {
-      case CreateTable(tableDesc, _, None) =>
-        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
-      case other =>
-        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
-          s"got ${other.getClass.getName}: $sql")
+  test("Duplicate clauses - create hive table") {
+    def createTableHeader(duplicateClause: String): String = {
+      s"CREATE TABLE my_tab(a INT, b STRING) STORED AS parquet 
$duplicateClause $duplicateClause"
     }
-  }
-
-  test("create table - with table properties") {
-    val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet 
TBLPROPERTIES('test' = 'test')"
 
-    val expectedTableDesc = CatalogTable(
-      identifier = TableIdentifier("my_tab"),
-      tableType = CatalogTableType.MANAGED,
-      storage = CatalogStorageFormat.empty,
-      schema = new StructType().add("a", IntegerType).add("b", StringType),
-      provider = Some("parquet"),
-      properties = Map("test" -> "test"))
-
-    parser.parsePlan(sql) match {
-      case CreateTable(tableDesc, _, None) =>
-        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
-      case other =>
-        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
-          s"got ${other.getClass.getName}: $sql")
-    }
-  }
-
-  test("Duplicate clauses - create table") {
-    def createTableHeader(duplicateClause: String, isNative: Boolean): String 
= {
-      val fileFormat = if (isNative) "USING parquet" else "STORED AS parquet"
-      s"CREATE TABLE my_tab(a INT, b STRING) $fileFormat $duplicateClause 
$duplicateClause"
-    }
-
-    Seq(true, false).foreach { isNative =>
-      intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')", isNative),
-        "Found duplicate clauses: TBLPROPERTIES")
-      intercept(createTableHeader("LOCATION '/tmp/file'", isNative),
-        "Found duplicate clauses: LOCATION")
-      intercept(createTableHeader("COMMENT 'a table'", isNative),
-        "Found duplicate clauses: COMMENT")
-      intercept(createTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS", 
isNative),
-        "Found duplicate clauses: CLUSTERED BY")
-    }
-
-    // Only for native data source tables
-    intercept(createTableHeader("PARTITIONED BY (b)", isNative = true),
+    intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')"),
+      "Found duplicate clauses: TBLPROPERTIES")
+    intercept(createTableHeader("LOCATION '/tmp/file'"),
+      "Found duplicate clauses: LOCATION")
+    intercept(createTableHeader("COMMENT 'a table'"),
+      "Found duplicate clauses: COMMENT")
+    intercept(createTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS"),
+      "Found duplicate clauses: CLUSTERED BY")
+    intercept(createTableHeader("PARTITIONED BY (k int)"),
       "Found duplicate clauses: PARTITIONED BY")
-
-    // Only for Hive serde tables
-    intercept(createTableHeader("PARTITIONED BY (k int)", isNative = false),
-      "Found duplicate clauses: PARTITIONED BY")
-    intercept(createTableHeader("STORED AS parquet", isNative = false),
+    intercept(createTableHeader("STORED AS parquet"),
       "Found duplicate clauses: STORED AS/BY")
     intercept(
-      createTableHeader("ROW FORMAT SERDE 
'parquet.hive.serde.ParquetHiveSerDe'", isNative = false),
+      createTableHeader("ROW FORMAT SERDE 
'parquet.hive.serde.ParquetHiveSerDe'"),
       "Found duplicate clauses: ROW FORMAT")
   }
 
-  test("create table - with location") {
-    val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION 
'/tmp/file'"
-
-    val expectedTableDesc = CatalogTable(
-      identifier = TableIdentifier("my_tab"),
-      tableType = CatalogTableType.EXTERNAL,
-      storage = CatalogStorageFormat.empty.copy(locationUri = Some(new 
URI("/tmp/file"))),
-      schema = new StructType().add("a", IntegerType).add("b", StringType),
-      provider = Some("parquet"))
-
-    parser.parsePlan(v1) match {
-      case CreateTable(tableDesc, _, None) =>
-        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
-      case other =>
-        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
-          s"got ${other.getClass.getName}: $v1")
-    }
-
-    val v2 =
-      """
-        |CREATE TABLE my_tab(a INT, b STRING)
-        |USING parquet
-        |OPTIONS (path '/tmp/file')
-        |LOCATION '/tmp/file'
-      """.stripMargin
-    val e = intercept[ParseException] {
-      parser.parsePlan(v2)
-    }
-    assert(e.message.contains("you can only specify one of them."))
-  }
-
-  test("create table - byte length literal table name") {
-    val sql = "CREATE TABLE 1m.2g(a INT) USING parquet"
-
-    val expectedTableDesc = CatalogTable(
-      identifier = TableIdentifier("2g", Some("1m")),
-      tableType = CatalogTableType.MANAGED,
-      storage = CatalogStorageFormat.empty,
-      schema = new StructType().add("a", IntegerType),
-      provider = Some("parquet"))
-
-    parser.parsePlan(sql) match {
-      case CreateTable(tableDesc, _, None) =>
-        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
-      case other =>
-        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
-          s"got ${other.getClass.getName}: $sql")
-    }
-  }
-
   test("insert overwrite directory") {
     val v1 = "INSERT OVERWRITE DIRECTORY '/tmp/file' USING parquet SELECT 1 as 
a"
     parser.parsePlan(v1) match {
@@ -1165,84 +1020,6 @@ class DDLParserSuite extends PlanTest with 
SharedSQLContext {
     comparePlans(parsed, expected)
   }
 
-  test("support for other types in OPTIONS") {
-    val sql =
-      """
-        |CREATE TABLE table_name USING json
-        |OPTIONS (a 1, b 0.1, c TRUE)
-      """.stripMargin
-
-    val expectedTableDesc = CatalogTable(
-      identifier = TableIdentifier("table_name"),
-      tableType = CatalogTableType.MANAGED,
-      storage = CatalogStorageFormat.empty.copy(
-        properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true")
-      ),
-      schema = new StructType,
-      provider = Some("json")
-    )
-
-    parser.parsePlan(sql) match {
-      case CreateTable(tableDesc, _, None) =>
-        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
-      case other =>
-        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
-          s"got ${other.getClass.getName}: $sql")
-    }
-  }
-
-  test("Test CTAS against data source tables") {
-    val s1 =
-      """
-        |CREATE TABLE IF NOT EXISTS mydb.page_view
-        |USING parquet
-        |COMMENT 'This is the staging page view table'
-        |LOCATION '/user/external/page_view'
-        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
-        |AS SELECT * FROM src
-      """.stripMargin
-
-    val s2 =
-      """
-        |CREATE TABLE IF NOT EXISTS mydb.page_view
-        |USING parquet
-        |LOCATION '/user/external/page_view'
-        |COMMENT 'This is the staging page view table'
-        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
-        |AS SELECT * FROM src
-      """.stripMargin
-
-    val s3 =
-      """
-        |CREATE TABLE IF NOT EXISTS mydb.page_view
-        |USING parquet
-        |COMMENT 'This is the staging page view table'
-        |LOCATION '/user/external/page_view'
-        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
-        |AS SELECT * FROM src
-      """.stripMargin
-
-    checkParsing(s1)
-    checkParsing(s2)
-    checkParsing(s3)
-
-    def checkParsing(sql: String): Unit = {
-      val (desc, exists) = extractTableDesc(sql)
-      assert(exists)
-      assert(desc.identifier.database == Some("mydb"))
-      assert(desc.identifier.table == "page_view")
-      assert(desc.storage.locationUri == Some(new 
URI("/user/external/page_view")))
-      assert(desc.schema.isEmpty) // will be populated later when the table is 
actually created
-      assert(desc.comment == Some("This is the staging page view table"))
-      assert(desc.viewText.isEmpty)
-      assert(desc.viewDefaultDatabase.isEmpty)
-      assert(desc.viewQueryColumnNames.isEmpty)
-      assert(desc.partitionColumnNames.isEmpty)
-      assert(desc.provider == Some("parquet"))
-      assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2"))
-    }
-  }
-
   test("Test CTAS #1") {
     val s1 =
       """
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
new file mode 100644
index 0000000..89c5df0
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import java.net.URI
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.AnalysisTest
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.{CreateTable, 
DataSourceResolution}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+class PlanResolutionSuite extends AnalysisTest {
+  import CatalystSqlParser._
+
+  def parseAndResolve(query: String): LogicalPlan = {
+    DataSourceResolution(conf).apply(parsePlan(query))
+  }
+
+  private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
+    parseAndResolve(sql).collect {
+      case CreateTable(tableDesc, mode, _) => (tableDesc, mode == 
SaveMode.Ignore)
+    }.head
+  }
+
+  test("create table - with partitioned by") {
+    val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
+        "USING parquet PARTITIONED BY (a)"
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType()
+          .add("a", IntegerType, nullable = true, "test")
+          .add("b", StringType),
+      provider = Some("parquet"),
+      partitionColumnNames = Seq("a")
+    )
+
+    parseAndResolve(query) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $query")
+    }
+  }
+
+  test("create table - with bucket") {
+    val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
+        "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType().add("a", IntegerType).add("b", StringType),
+      provider = Some("parquet"),
+      bucketSpec = Some(BucketSpec(5, Seq("a"), Seq("b")))
+    )
+
+    parseAndResolve(query) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $query")
+    }
+  }
+
+  test("create table - with comment") {
+    val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 
'abc'"
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType().add("a", IntegerType).add("b", StringType),
+      provider = Some("parquet"),
+      comment = Some("abc"))
+
+    parseAndResolve(sql) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("create table - with table properties") {
+    val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet 
TBLPROPERTIES('test' = 'test')"
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType().add("a", IntegerType).add("b", StringType),
+      provider = Some("parquet"),
+      properties = Map("test" -> "test"))
+
+    parseAndResolve(sql) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("create table - with location") {
+    val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION 
'/tmp/file'"
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.EXTERNAL,
+      storage = CatalogStorageFormat.empty.copy(locationUri = Some(new 
URI("/tmp/file"))),
+      schema = new StructType().add("a", IntegerType).add("b", StringType),
+      provider = Some("parquet"))
+
+    parseAndResolve(v1) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $v1")
+    }
+
+    val v2 =
+      """
+        |CREATE TABLE my_tab(a INT, b STRING)
+        |USING parquet
+        |OPTIONS (path '/tmp/file')
+        |LOCATION '/tmp/file'
+      """.stripMargin
+    val e = intercept[AnalysisException] {
+      parseAndResolve(v2)
+    }
+    assert(e.message.contains("you can only specify one of them."))
+  }
+
+  test("create table - byte length literal table name") {
+    val sql = "CREATE TABLE 1m.2g(a INT) USING parquet"
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("2g", Some("1m")),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType().add("a", IntegerType),
+      provider = Some("parquet"))
+
+    parseAndResolve(sql) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("support for other types in OPTIONS") {
+    val sql =
+      """
+        |CREATE TABLE table_name USING json
+        |OPTIONS (a 1, b 0.1, c TRUE)
+      """.stripMargin
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("table_name"),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty.copy(
+        properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true")
+      ),
+      schema = new StructType,
+      provider = Some("json")
+    )
+
+    parseAndResolve(sql) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
+            s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("Test CTAS against data source tables") {
+    val s1 =
+      """
+        |CREATE TABLE IF NOT EXISTS mydb.page_view
+        |USING parquet
+        |COMMENT 'This is the staging page view table'
+        |LOCATION '/user/external/page_view'
+        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+        |AS SELECT * FROM src
+      """.stripMargin
+
+    val s2 =
+      """
+        |CREATE TABLE IF NOT EXISTS mydb.page_view
+        |USING parquet
+        |LOCATION '/user/external/page_view'
+        |COMMENT 'This is the staging page view table'
+        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+        |AS SELECT * FROM src
+      """.stripMargin
+
+    val s3 =
+      """
+        |CREATE TABLE IF NOT EXISTS mydb.page_view
+        |USING parquet
+        |COMMENT 'This is the staging page view table'
+        |LOCATION '/user/external/page_view'
+        |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+        |AS SELECT * FROM src
+      """.stripMargin
+
+    checkParsing(s1)
+    checkParsing(s2)
+    checkParsing(s3)
+
+    def checkParsing(sql: String): Unit = {
+      val (desc, exists) = extractTableDesc(sql)
+      assert(exists)
+      assert(desc.identifier.database.contains("mydb"))
+      assert(desc.identifier.table == "page_view")
+      assert(desc.storage.locationUri.contains(new 
URI("/user/external/page_view")))
+      assert(desc.schema.isEmpty) // will be populated later when the table is 
actually created
+      assert(desc.comment.contains("This is the staging page view table"))
+      assert(desc.viewText.isEmpty)
+      assert(desc.viewDefaultDatabase.isEmpty)
+      assert(desc.viewQueryColumnNames.isEmpty)
+      assert(desc.partitionColumnNames.isEmpty)
+      assert(desc.provider.contains("parquet"))
+      assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2"))
+    }
+  }
+}
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 68f4b2d..877a0da 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -73,6 +73,7 @@ class HiveSessionStateBuilder(session: SparkSession, 
parentState: Option[Session
         new FindDataSourceTable(session) +:
         new ResolveSQLOnFile(session) +:
         new FallbackOrcDataSourceV2(session) +:
+        DataSourceResolution(conf) +:
         customResolutionRules
 
     override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to