Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b2b04c6da -> 0b14b3f13


[SPARK-14346] SHOW CREATE TABLE for data source tables

## What changes were proposed in this pull request?

This PR adds native `SHOW CREATE TABLE` DDL command for data source tables. 
Support for Hive tables will be added in follow-up PR(s).

To show table creation DDL for data source tables created by CTAS statements, 
this PR also added partitioning and bucketing support for normal `CREATE TABLE 
... USING ...` syntax.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)

A new test suite `ShowCreateTableSuite` is added in sql/hive package to test 
the new feature.

Author: Cheng Lian <l...@databricks.com>

Closes #12781 from liancheng/spark-14346-show-create-table.

(cherry picked from commit f036dd7ce727b40877337da66d687214786c4f14)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b14b3f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b14b3f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b14b3f1

Branch: refs/heads/branch-2.0
Commit: 0b14b3f13c2575837ca47dc13adf3e15d88438b9
Parents: b2b04c6
Author: Cheng Lian <l...@databricks.com>
Authored: Wed May 11 20:44:04 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed May 11 20:44:17 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   5 +-
 .../apache/spark/sql/catalyst/identifiers.scala |   2 +-
 .../spark/sql/execution/SparkSqlParser.scala    |  28 ++-
 .../spark/sql/execution/SparkStrategies.scala   |   6 +-
 .../spark/sql/execution/command/commands.scala  | 107 +-------
 .../command/createDataSourceTables.scala        |   6 +-
 .../spark/sql/execution/command/tables.scala    | 243 ++++++++++++++++++-
 .../spark/sql/execution/datasources/ddl.scala   |   2 +
 .../apache/spark/sql/internal/CatalogImpl.scala |   6 +-
 .../sql/execution/command/DDLCommandSuite.scala |   1 -
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   7 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |   2 -
 .../spark/sql/hive/ShowCreateTableSuite.scala   | 169 +++++++++++++
 .../sql/hive/execution/HiveQuerySuite.scala     |   1 -
 14 files changed, 458 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index ffb7a09..06ac37b 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -45,7 +45,9 @@ statement
     | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList     
#setDatabaseProperties
     | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)?      
#dropDatabase
     | createTableHeader ('(' colTypeList ')')? tableProvider
-        (OPTIONS tablePropertyList)?                                   
#createTableUsing
+        (OPTIONS tablePropertyList)?
+        (PARTITIONED BY partitionColumnNames=identifierList)?
+        bucketSpec?                                                    
#createTableUsing
     | createTableHeader tableProvider
         (OPTIONS tablePropertyList)?
         (PARTITIONED BY partitionColumnNames=identifierList)?
@@ -102,6 +104,7 @@ statement
         ((FROM | IN) db=identifier)?                                   
#showColumns
     | SHOW PARTITIONS tableIdentifier partitionSpec?                   
#showPartitions
     | SHOW FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))?         
#showFunctions
+    | SHOW CREATE TABLE tableIdentifier                                
#showCreateTable
     | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName            
#describeFunction
     | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
         tableIdentifier partitionSpec? describeColName?                
#describeTable

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 7d05845..d7b48ce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -44,7 +44,7 @@ sealed trait IdentifierWithDatabase {
 /**
  * Identifies a table in a database.
  * If `database` is not defined, the current database is used.
- * When we register a permenent function in the FunctionRegistry, we use
+ * When we register a permanent function in the FunctionRegistry, we use
  * unquotedString as the function name.
  */
 case class TableIdentifier(table: String, database: Option[String])

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 53aba1f..b6e074b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -182,6 +182,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder 
{
   }
 
   /**
+   * Creates a [[ShowCreateTableCommand]]
+   */
+  override def visitShowCreateTable(ctx: ShowCreateTableContext): LogicalPlan 
= withOrigin(ctx) {
+    val table = visitTableIdentifier(ctx.tableIdentifier())
+    ShowCreateTableCommand(table)
+  }
+
+  /**
    * Create a [[RefreshTable]] logical plan.
    */
   override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = 
withOrigin(ctx) {
@@ -287,6 +295,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder 
{
     }
     val options = 
Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
     val provider = ctx.tableProvider.qualifiedName.getText
+    val partitionColumnNames =
+      Option(ctx.partitionColumnNames)
+        .map(visitIdentifierList(_).toArray)
+        .getOrElse(Array.empty[String])
     val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
 
     if (ctx.query != null) {
@@ -302,16 +314,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
         SaveMode.ErrorIfExists
       }
 
-      val partitionColumnNames =
-        Option(ctx.partitionColumnNames)
-          .map(visitIdentifierList(_).toArray)
-          .getOrElse(Array.empty[String])
-
       CreateTableUsingAsSelect(
         table, provider, temp, partitionColumnNames, bucketSpec, mode, 
options, query)
     } else {
       val struct = Option(ctx.colTypeList()).map(createStructType)
-      CreateTableUsing(table, struct, provider, temp, options, ifNotExists, 
managedIfNoPath = true)
+      CreateTableUsing(
+        table,
+        struct,
+        provider,
+        temp,
+        options,
+        partitionColumnNames,
+        bucketSpec,
+        ifNotExists,
+        managedIfNoPath = true)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9747e58..faf359f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -372,10 +372,10 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTableUsing(tableIdent, userSpecifiedSchema, provider, true, 
opts, false, _) =>
+      case c: CreateTableUsing if c.temporary && !c.allowExisting =>
         ExecutedCommandExec(
           CreateTempTableUsing(
-            tableIdent, userSpecifiedSchema, provider, opts)) :: Nil
+            c.tableIdent, c.userSpecifiedSchema, c.provider, c.options)) :: Nil
 
       case c: CreateTableUsing if !c.temporary =>
         val cmd =
@@ -384,6 +384,8 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
             c.userSpecifiedSchema,
             c.provider,
             c.options,
+            c.partitionColumns,
+            c.bucketSpec,
             c.allowExisting,
             c.managedIfNoPath)
         ExecutedCommandExec(cmd) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 576e12a..d5aaccc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -17,19 +17,14 @@
 
 package org.apache.spark.sql.execution.command
 
-import java.io.File
-
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, 
TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.execution.debug._
 import org.apache.spark.sql.types._
 
@@ -117,101 +112,3 @@ case class ExplainCommand(
     ("Error occurred during query planning: \n" + 
cause.getMessage).split("\n").map(Row(_))
   }
 }
-
-/**
- * A command to list the column names for a table. This function creates a
- * [[ShowColumnsCommand]] logical plan.
- *
- * The syntax of using this command in SQL is:
- * {{{
- *   SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
- * }}}
- */
-case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
-  // The result of SHOW COLUMNS has one column called 'result'
-  override val output: Seq[Attribute] = {
-    AttributeReference("result", StringType, nullable = false)() :: Nil
-  }
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
-      Row(c.name)
-    }
-  }
-}
-
-/**
- * A command to list the partition names of a table. If the partition spec is 
specified,
- * partitions that match the spec are returned. [[AnalysisException]] 
exception is thrown under
- * the following conditions:
- *
- * 1. If the command is called for a non partitioned table.
- * 2. If the partition spec refers to the columns that are not defined as 
partitioning columns.
- *
- * This function creates a [[ShowPartitionsCommand]] logical plan
- *
- * The syntax of using this command in SQL is:
- * {{{
- *   SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
- * }}}
- */
-case class ShowPartitionsCommand(
-    table: TableIdentifier,
-    spec: Option[TablePartitionSpec]) extends RunnableCommand {
-  // The result of SHOW PARTITIONS has one column called 'result'
-  override val output: Seq[Attribute] = {
-    AttributeReference("result", StringType, nullable = false)() :: Nil
-  }
-
-  private def getPartName(spec: TablePartitionSpec, partColNames: 
Seq[String]): String = {
-    partColNames.map { name =>
-      PartitioningUtils.escapePathName(name) + "=" + 
PartitioningUtils.escapePathName(spec(name))
-    }.mkString(File.separator)
-  }
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val catalog = sparkSession.sessionState.catalog
-    val db = table.database.getOrElse(catalog.getCurrentDatabase)
-    if (catalog.isTemporaryTable(table)) {
-      throw new AnalysisException("SHOW PARTITIONS is not allowed on a 
temporary table: " +
-        s"${table.unquotedString}")
-    } else {
-      val tab = catalog.getTableMetadata(table)
-      /**
-       * Validate and throws an [[AnalysisException]] exception under the 
following conditions:
-       * 1. If the table is not partitioned.
-       * 2. If it is a datasource table.
-       * 3. If it is a view or index table.
-       */
-      if (tab.tableType == CatalogTableType.VIEW ||
-        tab.tableType == CatalogTableType.INDEX) {
-        throw new AnalysisException("SHOW PARTITIONS is not allowed on a view 
or index table: " +
-          s"${tab.qualifiedName}")
-      }
-      if (!DDLUtils.isTablePartitioned(tab)) {
-        throw new AnalysisException("SHOW PARTITIONS is not allowed on a table 
that is not " +
-          s"partitioned: ${tab.qualifiedName}")
-      }
-      if (DDLUtils.isDatasourceTable(tab)) {
-        throw new AnalysisException("SHOW PARTITIONS is not allowed on a 
datasource table: " +
-          s"${tab.qualifiedName}")
-      }
-      /**
-       * Validate the partitioning spec by making sure all the referenced 
columns are
-       * defined as partitioning columns in table definition. An 
AnalysisException exception is
-       * thrown if the partitioning spec is invalid.
-       */
-      if (spec.isDefined) {
-        val badColumns = 
spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
-        if (badColumns.nonEmpty) {
-          throw new AnalysisException(
-            s"Non-partitioning column(s) [${badColumns.mkString(", ")}] are " +
-              s"specified for SHOW PARTITIONS")
-        }
-      }
-      val partNames =
-        catalog.listPartitions(table, spec).map(p => getPartName(p.spec, 
tab.partitionColumnNames))
-      partNames.map { p => Row(p) }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 3525111..de3c868 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -51,6 +51,8 @@ case class CreateDataSourceTableCommand(
     userSpecifiedSchema: Option[StructType],
     provider: String,
     options: Map[String, String],
+    partitionColumns: Array[String],
+    bucketSpec: Option[BucketSpec],
     ignoreIfExists: Boolean,
     managedIfNoPath: Boolean)
   extends RunnableCommand {
@@ -103,8 +105,8 @@ case class CreateDataSourceTableCommand(
       sparkSession = sparkSession,
       tableIdent = tableIdent,
       userSpecifiedSchema = userSpecifiedSchema,
-      partitionColumns = Array.empty[String],
-      bucketSpec = None,
+      partitionColumns = partitionColumns,
+      bucketSpec = bucketSpec,
       provider = provider,
       options = optionsWithPath,
       isExternal = isExternal)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 0f90715..e6dcd1e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -26,10 +26,13 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, 
CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, 
UnaryNode}
-import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType, 
StructType}
+import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 case class CreateTableAsSelectLogicalPlan(
@@ -490,3 +493,241 @@ case class ShowTablePropertiesCommand(table: 
TableIdentifier, propertyKey: Optio
     }
   }
 }
+
+/**
+ * A command to list the column names for a table. This function creates a
+ * [[ShowColumnsCommand]] logical plan.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
+ * }}}
+ */
+case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
+  // The result of SHOW COLUMNS has one column called 'result'
+  override val output: Seq[Attribute] = {
+    AttributeReference("result", StringType, nullable = false)() :: Nil
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
+      Row(c.name)
+    }
+  }
+}
+
+/**
+ * A command to list the partition names of a table. If the partition spec is 
specified,
+ * partitions that match the spec are returned. [[AnalysisException]] 
exception is thrown under
+ * the following conditions:
+ *
+ * 1. If the command is called for a non partitioned table.
+ * 2. If the partition spec refers to the columns that are not defined as 
partitioning columns.
+ *
+ * This function creates a [[ShowPartitionsCommand]] logical plan
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
+ * }}}
+ */
+case class ShowPartitionsCommand(
+    table: TableIdentifier,
+    spec: Option[TablePartitionSpec]) extends RunnableCommand {
+  // The result of SHOW PARTITIONS has one column called 'result'
+  override val output: Seq[Attribute] = {
+    AttributeReference("result", StringType, nullable = false)() :: Nil
+  }
+
+  private def getPartName(spec: TablePartitionSpec, partColNames: 
Seq[String]): String = {
+    partColNames.map { name =>
+      PartitioningUtils.escapePathName(name) + "=" + 
PartitioningUtils.escapePathName(spec(name))
+    }.mkString(File.separator)
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
+
+    if (catalog.isTemporaryTable(table)) {
+      throw new AnalysisException(
+        s"SHOW PARTITIONS is not allowed on a temporary table: 
${table.unquotedString}")
+    }
+
+    val tab = catalog.getTableMetadata(table)
+
+    /**
+     * Validate and throws an [[AnalysisException]] exception under the 
following conditions:
+     * 1. If the table is not partitioned.
+     * 2. If it is a datasource table.
+     * 3. If it is a view or index table.
+     */
+    if (tab.tableType == VIEW ||
+      tab.tableType == INDEX) {
+      throw new AnalysisException(
+        s"SHOW PARTITIONS is not allowed on a view or index table: 
${tab.qualifiedName}")
+    }
+
+    if (!DDLUtils.isTablePartitioned(tab)) {
+      throw new AnalysisException(
+        s"SHOW PARTITIONS is not allowed on a table that is not partitioned: 
${tab.qualifiedName}")
+    }
+
+    if (DDLUtils.isDatasourceTable(tab)) {
+      throw new AnalysisException(
+        s"SHOW PARTITIONS is not allowed on a datasource table: 
${tab.qualifiedName}")
+    }
+
+    /**
+     * Validate the partitioning spec by making sure all the referenced 
columns are
+     * defined as partitioning columns in table definition. An 
AnalysisException exception is
+     * thrown if the partitioning spec is invalid.
+     */
+    if (spec.isDefined) {
+      val badColumns = 
spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
+      if (badColumns.nonEmpty) {
+        val badCols = badColumns.mkString("[", ", ", "]")
+        throw new AnalysisException(
+          s"Non-partitioning column(s) $badCols are specified for SHOW 
PARTITIONS")
+      }
+    }
+
+    val partNames = catalog.listPartitions(table, spec).map { p =>
+      getPartName(p.spec, tab.partitionColumnNames)
+    }
+
+    partNames.map(Row(_))
+  }
+}
+
+case class ShowCreateTableCommand(table: TableIdentifier) extends 
RunnableCommand {
+  override val output: Seq[Attribute] = Seq(
+    AttributeReference("createtab_stmt", StringType, nullable = false)()
+  )
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
+
+    if (catalog.isTemporaryTable(table)) {
+      throw new AnalysisException(
+        s"SHOW CREATE TABLE cannot be applied to temporary table")
+    }
+
+    if (!catalog.tableExists(table)) {
+      throw new AnalysisException(s"Table $table doesn't exist")
+    }
+
+    val tableMetadata = catalog.getTableMetadata(table)
+
+    val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
+      showCreateDataSourceTable(tableMetadata)
+    } else {
+      throw new UnsupportedOperationException(
+        "SHOW CREATE TABLE only supports Spark SQL data source tables.")
+    }
+
+    Seq(Row(stmt))
+  }
+
+  private def showCreateDataSourceTable(metadata: CatalogTable): String = {
+    val builder = StringBuilder.newBuilder
+
+    builder ++= s"CREATE TABLE ${table.quotedString} "
+    showDataSourceTableDataCols(metadata, builder)
+    showDataSourceTableOptions(metadata, builder)
+    showDataSourceTableNonDataColumns(metadata, builder)
+
+    builder.toString()
+  }
+
+  private def showDataSourceTableDataCols(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
+    val props = metadata.properties
+    val schemaParts = for {
+      numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
+      index <- 0 until numParts.toInt
+    } yield props.getOrElse(
+      s"spark.sql.sources.schema.part.$index",
+      throw new AnalysisException(
+        s"Corrupted schema in catalog: $numParts parts expected, but part 
$index is missing."
+      )
+    )
+
+    if (schemaParts.nonEmpty) {
+      val fields = 
DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType].fields
+      val colTypeList = fields.map(f => s"${quoteIdentifier(f.name)} 
${f.dataType.sql}")
+      builder ++= colTypeList.mkString("(", ", ", ")")
+    }
+
+    builder ++= "\n"
+  }
+
+  private def showDataSourceTableOptions(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
+    val props = metadata.properties
+
+    builder ++= s"USING ${props("spark.sql.sources.provider")}\n"
+
+    val dataSourceOptions = metadata.storage.serdeProperties.filterNot {
+      case (key, value) =>
+        // If it's a managed table, omit PATH option. Spark SQL always creates 
external table
+        // when the table creation DDL contains the PATH option.
+        key.toLowerCase == "path" && metadata.tableType == MANAGED
+    }.map {
+      case (key, value) => s"${quoteIdentifier(key)} 
'${escapeSingleQuotedString(value)}'"
+    }
+
+    if (dataSourceOptions.nonEmpty) {
+      builder ++= "OPTIONS (\n"
+      builder ++= dataSourceOptions.mkString("  ", ",\n  ", "\n")
+      builder ++= ")\n"
+    }
+  }
+
+  private def showDataSourceTableNonDataColumns(
+      metadata: CatalogTable, builder: StringBuilder): Unit = {
+    val props = metadata.properties
+
+    def getColumnNamesByType(colType: String, typeName: String): Seq[String] = 
{
+      (for {
+        numCols <- 
props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
+        index <- 0 until numCols.toInt
+      } yield props.getOrElse(
+        s"spark.sql.sources.schema.${colType}Col.$index",
+        throw new AnalysisException(
+          s"Corrupted $typeName in catalog: $numCols parts expected, but part 
$index is missing."
+        )
+      )).map(quoteIdentifier)
+    }
+
+    val partCols = getColumnNamesByType("part", "partitioning columns")
+    if (partCols.nonEmpty) {
+      builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
+    }
+
+    val bucketCols = getColumnNamesByType("bucket", "bucketing columns")
+    if (bucketCols.nonEmpty) {
+      builder ++= s"CLUSTERED BY ${bucketCols.mkString("(", ", ", ")")}\n"
+
+      val sortCols = getColumnNamesByType("sort", "sorting columns")
+      if (sortCols.nonEmpty) {
+        builder ++= s"SORTED BY ${sortCols.mkString("(", ", ", ")")}\n"
+      }
+
+      val numBuckets = props.getOrElse(
+        "spark.sql.sources.schema.numBuckets",
+        throw new AnalysisException("Corrupted bucket spec in catalog: missing 
bucket number")
+      )
+
+      builder ++= s"INTO $numBuckets BUCKETS\n"
+    }
+  }
+
+  private def escapeSingleQuotedString(str: String): String = {
+    val builder = StringBuilder.newBuilder
+
+    str.foreach {
+      case '\'' => builder ++= s"\\\'"
+      case ch => builder += ch
+    }
+
+    builder.toString()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 7d0a3d9..3863be5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -38,6 +38,8 @@ case class CreateTableUsing(
     provider: String,
     temporary: Boolean,
     options: Map[String, String],
+    partitionColumns: Array[String],
+    bucketSpec: Option[BucketSpec],
     allowExisting: Boolean,
     managedIfNoPath: Boolean) extends LogicalPlan with logical.Command {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index d08dca3..fdfb188 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -225,7 +225,9 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
         userSpecifiedSchema = None,
         source,
         temporary = false,
-        options,
+        options = options,
+        partitionColumns = Array.empty[String],
+        bucketSpec = None,
         allowExisting = false,
         managedIfNoPath = false)
     sparkSession.executePlan(cmd).toRdd
@@ -272,6 +274,8 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
         source,
         temporary = false,
         options,
+        partitionColumns = Array.empty[String],
+        bucketSpec = None,
         allowExisting = false,
         managedIfNoPath = false)
     sparkSession.executePlan(cmd).toRdd

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index aeb613a..13df449 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.internal.SQLConf
 
-
 // TODO: merge this with DDLSuite (SPARK-14441)
 class DDLCommandSuite extends PlanTest {
   private val parser = new SparkSqlParser(new SQLConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b0a3a80..8cfcec7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
 import scala.collection.JavaConverters._
 
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, 
CreateViewCommand}
+import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => 
ParquetDefaultSource, ParquetRelation}
 import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
@@ -44,7 +44,6 @@ import org.apache.spark.sql.types._
  * cleaned up to integrate more nicely with [[HiveExternalCatalog]].
  */
 private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends 
Logging {
-  private val conf = sparkSession.conf
   private val sessionState = 
sparkSession.sessionState.asInstanceOf[HiveSessionState]
   private val client = 
sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive
 
@@ -110,7 +109,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
           schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
 
         // We only need names at here since userSpecifiedSchema we loaded from 
the metastore
-        // contains partition columns. We can always get datatypes of 
partitioning columns
+        // contains partition columns. We can always get data types of 
partitioning columns
         // from userSpecifiedSchema.
         val partitionColumns = getColumnNames("part")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 6d418c1..2f6aa36 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -25,10 +25,8 @@ import scala.collection.mutable
 import scala.language.implicitConversions
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry
-import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 
 import org.apache.spark.{SparkConf, SparkContext}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
new file mode 100644
index 0000000..12a1ad8
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+class ShowCreateTableSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+  import testImplicits._
+
+  test("data source table with user specified schema") {
+    withTable("ddl_test1") {
+      val jsonFilePath = 
Utils.getSparkClassLoader.getResource("sample.json").getFile
+
+      sql(
+        s"""CREATE TABLE ddl_test1 (
+           |  a STRING,
+           |  b STRING,
+           |  `extra col` ARRAY<INT>,
+           |  `<another>` STRUCT<x: INT, y: ARRAY<BOOLEAN>>
+           |)
+           |USING json
+           |OPTIONS (
+           | PATH '$jsonFilePath'
+           |)
+         """.stripMargin
+      )
+
+      checkCreateTable("ddl_test1")
+    }
+  }
+
+  test("data source table CTAS") {
+    withTable("ddl_test2") {
+      sql(
+        s"""CREATE TABLE ddl_test2
+           |USING json
+           |AS SELECT 1 AS a, "foo" AS b
+         """.stripMargin
+      )
+
+      checkCreateTable("ddl_test2")
+    }
+  }
+
+  test("partitioned data source table") {
+    withTable("ddl_test3") {
+      sql(
+        s"""CREATE TABLE ddl_test3
+           |USING json
+           |PARTITIONED BY (b)
+           |AS SELECT 1 AS a, "foo" AS b
+         """.stripMargin
+      )
+
+      checkCreateTable("ddl_test3")
+    }
+  }
+
+  test("bucketed data source table") {
+    withTable("ddl_test3") {
+      sql(
+        s"""CREATE TABLE ddl_test3
+           |USING json
+           |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
+           |AS SELECT 1 AS a, "foo" AS b
+         """.stripMargin
+      )
+
+      checkCreateTable("ddl_test3")
+    }
+  }
+
+  test("partitioned bucketed data source table") {
+    withTable("ddl_test4") {
+      sql(
+        s"""CREATE TABLE ddl_test4
+           |USING json
+           |PARTITIONED BY (c)
+           |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
+           |AS SELECT 1 AS a, "foo" AS b, 2.5 AS c
+         """.stripMargin
+      )
+
+      checkCreateTable("ddl_test4")
+    }
+  }
+
+  test("data source table using Dataset API") {
+    withTable("ddl_test5") {
+      spark
+        .range(3)
+        .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd, 'id as 'e)
+        .write
+        .mode("overwrite")
+        .partitionBy("a", "b")
+        .bucketBy(2, "c", "d")
+        .saveAsTable("ddl_test5")
+
+      checkCreateTable(TableIdentifier("ddl_test5", Some("default")))
+    }
+  }
+
+  private def checkCreateTable(table: String): Unit = {
+    checkCreateTable(TableIdentifier(table, Some("default")))
+  }
+
+  private def checkCreateTable(table: TableIdentifier): Unit = {
+    val db = table.database.getOrElse("default")
+    val expected = spark.externalCatalog.getTable(db, table.table)
+    val shownDDL = sql(s"SHOW CREATE TABLE 
${table.quotedString}").head().getString(0)
+    sql(s"DROP TABLE ${table.quotedString}")
+
+    withTable(table.table) {
+      sql(shownDDL)
+      val actual = spark.externalCatalog.getTable(db, table.table)
+      checkCatalogTables(expected, actual)
+    }
+  }
+
+  private def checkCatalogTables(expected: CatalogTable, actual: 
CatalogTable): Unit = {
+    def normalize(table: CatalogTable): CatalogTable = {
+      val nondeterministicProps = Set(
+        "CreateTime",
+        "transient_lastDdlTime",
+        "grantTime",
+        "lastUpdateTime",
+        "last_modified_by",
+        "last_modified_time",
+        "Owner:",
+        "COLUMN_STATS_ACCURATE",
+        // The following are hive specific schema parameters which we do not 
need to match exactly.
+        "numFiles",
+        "numRows",
+        "rawDataSize",
+        "totalSize",
+        "totalNumberFiles",
+        "maxFileSize",
+        "minFileSize"
+      )
+
+      table.copy(
+        createTime = 0L,
+        lastAccessTime = 0L,
+        properties = 
table.properties.filterKeys(!nondeterministicProps.contains(_)))
+    }
+
+    assert(normalize(expected) == normalize(actual))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index bbb775e..19f8cb3 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1166,7 +1166,6 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
   }
 
   test("some show commands are not supported") {
-    assertUnsupportedFeature { sql("SHOW CREATE TABLE my_table") }
     assertUnsupportedFeature { sql("SHOW COMPACTIONS") }
     assertUnsupportedFeature { sql("SHOW TRANSACTIONS") }
     assertUnsupportedFeature { sql("SHOW INDEXES ON my_table") }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to