Repository: spark
Updated Branches:
  refs/heads/branch-2.0 7b62b7c11 -> 2dddec40d


[SPARK-14346][SQL] Native SHOW CREATE TABLE for Hive tables/views

## What changes were proposed in this pull request?

This is a follow-up of #12781. It adds native `SHOW CREATE TABLE` support for 
Hive tables and views. A new field `hasUnsupportedFeatures` is added to 
`CatalogTable` to indicate whether all table metadata retrieved from the 
concrete underlying external catalog (i.e. Hive metastore in this case) can be 
mapped to fields in `CatalogTable`. This flag is useful when the target Hive 
table contains structures that can't be handled by Spark SQL, e.g., skewed 
columns and storage handler, etc..

## How was this patch tested?

New test cases are added in `ShowCreateTableSuite` to do round-trip tests.

Author: Cheng Lian <l...@databricks.com>

Closes #13079 from liancheng/spark-14346-show-create-table-for-hive-tables.

(cherry picked from commit b674e67c22bf663334e537e35787c00533adbb04)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2dddec40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2dddec40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2dddec40

Branch: refs/heads/branch-2.0
Commit: 2dddec40d6562d1d16bb242bf7dc730431ee1e3e
Parents: 7b62b7c
Author: Cheng Lian <l...@databricks.com>
Authored: Tue May 17 15:56:44 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue May 17 15:56:57 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  12 +-
 .../spark/sql/execution/command/tables.scala    | 184 +++++++++++++-----
 .../spark/sql/hive/client/HiveClientImpl.scala  |  10 +-
 .../spark/sql/hive/ShowCreateTableSuite.scala   | 185 ++++++++++++++++++-
 4 files changed, 333 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2dddec40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index d215655..d4f5cbb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -79,6 +79,12 @@ case class CatalogTablePartition(
  *
  * Note that Hive's metastore also tracks skewed columns. We should consider 
adding that in the
  * future once we have a better understanding of how we want to handle skewed 
columns.
+ *
+ * @param hasUnsupportedFeatures is used to indicate whether all table 
metadata entries retrieved
+ *        from the concrete underlying external catalog (e.g. Hive metastore) 
are supported by
+ *        Spark SQL. For example, if the underlying Hive table has skewed 
columns, this information
+ *        can't be mapped to [[CatalogTable]] since Spark SQL doesn't handle 
skewed columns for now.
+ *        In this case `hasUnsupportedFeatures` is set to true. By default, it 
is false.
  */
 case class CatalogTable(
     identifier: TableIdentifier,
@@ -95,7 +101,8 @@ case class CatalogTable(
     properties: Map[String, String] = Map.empty,
     viewOriginalText: Option[String] = None,
     viewText: Option[String] = None,
-    comment: Option[String] = None) {
+    comment: Option[String] = None,
+    hasUnsupportedFeatures: Boolean = false) {
 
   // Verify that the provided columns are part of the schema
   private val colNames = schema.map(_.name).toSet
@@ -200,6 +207,7 @@ case class SimpleCatalogRelation(
     }
   }
 
-  require(metadata.identifier.database == Some(databaseName),
+  require(
+    metadata.identifier.database.contains(databaseName),
     "provided database does not match the one specified in the table 
definition")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2dddec40/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index bb4f1ff..1fc02d1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -626,40 +626,149 @@ case class ShowCreateTableCommand(table: 
TableIdentifier) extends RunnableComman
     val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
       showCreateDataSourceTable(tableMetadata)
     } else {
-      throw new UnsupportedOperationException(
-        "SHOW CREATE TABLE only supports Spark SQL data source tables.")
+      showCreateHiveTable(tableMetadata)
     }
 
     Seq(Row(stmt))
   }
 
+  private def showCreateHiveTable(metadata: CatalogTable): String = {
+    def reportUnsupportedError(): Unit = {
+      throw new UnsupportedOperationException(
+        s"Failed to execute SHOW CREATE TABLE against table 
${metadata.identifier.quotedString}, " +
+          "because it contains table structure(s) (e.g. skewed columns) that 
Spark SQL doesn't " +
+          "support yet."
+      )
+    }
+
+    if (metadata.hasUnsupportedFeatures) {
+      reportUnsupportedError()
+    }
+
+    val builder = StringBuilder.newBuilder
+
+    val tableTypeString = metadata.tableType match {
+      case EXTERNAL => " EXTERNAL TABLE"
+      case VIEW => " VIEW"
+      case MANAGED => " TABLE"
+      case INDEX => reportUnsupportedError()
+    }
+
+    builder ++= s"CREATE$tableTypeString ${table.quotedString}"
+
+    if (metadata.tableType == VIEW) {
+      if (metadata.schema.nonEmpty) {
+        builder ++= metadata.schema.map(_.name).mkString("(", ", ", ")")
+      }
+      builder ++= metadata.viewText.mkString(" AS\n", "", "\n")
+    } else {
+      showHiveTableHeader(metadata, builder)
+      showHiveTableNonDataColumns(metadata, builder)
+      showHiveTableStorageInfo(metadata, builder)
+      showHiveTableProperties(metadata, builder)
+    }
+
+    builder.toString()
+  }
+
+  private def showHiveTableHeader(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
+    val columns = metadata.schema.filterNot { column =>
+      metadata.partitionColumnNames.contains(column.name)
+    }.map(columnToDDLFragment)
+
+    if (columns.nonEmpty) {
+      builder ++= columns.mkString("(", ", ", ")\n")
+    }
+
+    metadata
+      .comment
+      .map("COMMENT '" + escapeSingleQuotedString(_) + "'\n")
+      .foreach(builder.append)
+  }
+
+  private def columnToDDLFragment(column: CatalogColumn): String = {
+    val comment = column.comment.map(escapeSingleQuotedString).map(" COMMENT 
'" + _ + "'")
+    s"${quoteIdentifier(column.name)} 
${column.dataType}${comment.getOrElse("")}"
+  }
+
+  private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
+    if (metadata.partitionColumns.nonEmpty) {
+      val partCols = metadata.partitionColumns.map(columnToDDLFragment)
+      builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
+    }
+
+    if (metadata.bucketColumnNames.nonEmpty) {
+      throw new UnsupportedOperationException(
+        "Creating Hive table with bucket spec is not supported yet.")
+    }
+  }
+
+  private def showHiveTableStorageInfo(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
+    val storage = metadata.storage
+
+    storage.serde.foreach { serde =>
+      builder ++= s"ROW FORMAT SERDE '$serde'\n"
+
+      val serdeProps = metadata.storage.serdeProperties.map {
+        case (key, value) =>
+          s"'${escapeSingleQuotedString(key)}' = 
'${escapeSingleQuotedString(value)}'"
+      }
+
+      builder ++= serdeProps.mkString("WITH SERDEPROPERTIES (", ",\n  ", 
"\n)\n")
+    }
+
+    if (storage.inputFormat.isDefined || storage.outputFormat.isDefined) {
+      builder ++= "STORED AS\n"
+
+      storage.inputFormat.foreach { format =>
+        builder ++= s"  INPUTFORMAT '${escapeSingleQuotedString(format)}'\n"
+      }
+
+      storage.outputFormat.foreach { format =>
+        builder ++= s"  OUTPUTFORMAT '${escapeSingleQuotedString(format)}'\n"
+      }
+    }
+
+    if (metadata.tableType == EXTERNAL) {
+      storage.locationUri.foreach { uri =>
+        builder ++= s"LOCATION '$uri'\n"
+      }
+    }
+  }
+
+  private def showHiveTableProperties(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
+    if (metadata.properties.nonEmpty) {
+      val filteredProps = metadata.properties.filterNot {
+        // Skips "EXTERNAL" property for external tables
+        case (key, _) => key == "EXTERNAL" && metadata.tableType == EXTERNAL
+      }
+
+      val props = filteredProps.map { case (key, value) =>
+        s"'${escapeSingleQuotedString(key)}' = 
'${escapeSingleQuotedString(value)}'"
+      }
+
+      if (props.nonEmpty) {
+        builder ++= props.mkString("TBLPROPERTIES (", ",\n  ", ")\n")
+      }
+    }
+  }
+
   private def showCreateDataSourceTable(metadata: CatalogTable): String = {
     val builder = StringBuilder.newBuilder
 
     builder ++= s"CREATE TABLE ${table.quotedString} "
-    showDataSourceTableDataCols(metadata, builder)
+    showDataSourceTableDataColumns(metadata, builder)
     showDataSourceTableOptions(metadata, builder)
     showDataSourceTableNonDataColumns(metadata, builder)
 
     builder.toString()
   }
 
-  private def showDataSourceTableDataCols(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
-    val props = metadata.properties
-    val schemaParts = for {
-      numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
-      index <- 0 until numParts.toInt
-    } yield props.getOrElse(
-      s"spark.sql.sources.schema.part.$index",
-      throw new AnalysisException(
-        s"Corrupted schema in catalog: $numParts parts expected, but part 
$index is missing."
-      )
-    )
-
-    if (schemaParts.nonEmpty) {
-      val fields = 
DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType].fields
-      val colTypeList = fields.map(f => s"${quoteIdentifier(f.name)} 
${f.dataType.sql}")
-      builder ++= colTypeList.mkString("(", ", ", ")")
+  private def showDataSourceTableDataColumns(
+      metadata: CatalogTable, builder: StringBuilder): Unit = {
+    DDLUtils.getSchemaFromTableProperties(metadata).foreach { schema =>
+      val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} 
${f.dataType.sql}")
+      builder ++= columns.mkString("(", ", ", ")")
     }
 
     builder ++= "\n"
@@ -688,40 +797,21 @@ case class ShowCreateTableCommand(table: TableIdentifier) 
extends RunnableComman
 
   private def showDataSourceTableNonDataColumns(
       metadata: CatalogTable, builder: StringBuilder): Unit = {
-    val props = metadata.properties
-
-    def getColumnNamesByType(colType: String, typeName: String): Seq[String] = 
{
-      (for {
-        numCols <- 
props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
-        index <- 0 until numCols.toInt
-      } yield props.getOrElse(
-        s"spark.sql.sources.schema.${colType}Col.$index",
-        throw new AnalysisException(
-          s"Corrupted $typeName in catalog: $numCols parts expected, but part 
$index is missing."
-        )
-      )).map(quoteIdentifier)
-    }
-
-    val partCols = getColumnNamesByType("part", "partitioning columns")
+    val partCols = DDLUtils.getPartitionColumnsFromTableProperties(metadata)
     if (partCols.nonEmpty) {
       builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
     }
 
-    val bucketCols = getColumnNamesByType("bucket", "bucketing columns")
-    if (bucketCols.nonEmpty) {
-      builder ++= s"CLUSTERED BY ${bucketCols.mkString("(", ", ", ")")}\n"
-
-      val sortCols = getColumnNamesByType("sort", "sorting columns")
-      if (sortCols.nonEmpty) {
-        builder ++= s"SORTED BY ${sortCols.mkString("(", ", ", ")")}\n"
-      }
+    DDLUtils.getBucketSpecFromTableProperties(metadata).foreach { spec =>
+      if (spec.bucketColumnNames.nonEmpty) {
+        builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", 
", ")")}\n"
 
-      val numBuckets = props.getOrElse(
-        "spark.sql.sources.schema.numBuckets",
-        throw new AnalysisException("Corrupted bucket spec in catalog: missing 
bucket number")
-      )
+        if (spec.sortColumnNames.nonEmpty) {
+          builder ++= s"SORTED BY ${spec.sortColumnNames.mkString("(", ", ", 
")")}\n"
+        }
 
-      builder ++= s"INTO $numBuckets BUCKETS\n"
+        builder ++= s"INTO ${spec.numBuckets} BUCKETS\n"
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2dddec40/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 78c457b..a4e9f03 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -339,6 +339,13 @@ private[hive] class HiveClientImpl(
       // partition columns are part of the schema
       val partCols = h.getPartCols.asScala.map(fromHiveColumn)
       val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols
+
+      // Skew spec, storage handler, and bucketing info can't be mapped to 
CatalogTable (yet)
+      val hasUnsupportedFeatures =
+        !h.getSkewedColNames.isEmpty ||
+          h.getStorageHandler != null ||
+          !h.getBucketCols.isEmpty
+
       CatalogTable(
         identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
         tableType = h.getTableType match {
@@ -365,7 +372,8 @@ private[hive] class HiveClientImpl(
         ),
         properties = h.getParameters.asScala.toMap,
         viewOriginalText = Option(h.getViewOriginalText),
-        viewText = Option(h.getViewExpandedText))
+        viewText = Option(h.getViewExpandedText),
+        hasUnsupportedFeatures = hasUnsupportedFeatures)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2dddec40/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index 12a1ad8..3b8068d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -116,24 +116,177 @@ class ShowCreateTableSuite extends QueryTest with 
SQLTestUtils with TestHiveSing
         .bucketBy(2, "c", "d")
         .saveAsTable("ddl_test5")
 
-      checkCreateTable(TableIdentifier("ddl_test5", Some("default")))
+      checkCreateTable("ddl_test5")
     }
   }
 
+  test("simple hive table") {
+    withTable("t1") {
+      sql(
+        s"""CREATE TABLE t1 (
+           |  c1 INT COMMENT 'bla',
+           |  c2 STRING
+           |)
+           |TBLPROPERTIES (
+           |  'prop1' = 'value1',
+           |  'prop2' = 'value2'
+           |)
+         """.stripMargin
+      )
+
+      checkCreateTable("t1")
+    }
+  }
+
+  test("simple external hive table") {
+    withTempDir { dir =>
+      withTable("t1") {
+        sql(
+          s"""CREATE TABLE t1 (
+             |  c1 INT COMMENT 'bla',
+             |  c2 STRING
+             |)
+             |LOCATION '$dir'
+             |TBLPROPERTIES (
+             |  'prop1' = 'value1',
+             |  'prop2' = 'value2'
+             |)
+           """.stripMargin
+        )
+
+        checkCreateTable("t1")
+      }
+    }
+  }
+
+  test("partitioned hive table") {
+    withTable("t1") {
+      sql(
+        s"""CREATE TABLE t1 (
+           |  c1 INT COMMENT 'bla',
+           |  c2 STRING
+           |)
+           |COMMENT 'bla'
+           |PARTITIONED BY (
+           |  p1 BIGINT COMMENT 'bla',
+           |  p2 STRING
+           |)
+         """.stripMargin
+      )
+
+      checkCreateTable("t1")
+    }
+  }
+
+  test("hive table with explicit storage info") {
+    withTable("t1") {
+      sql(
+        s"""CREATE TABLE t1 (
+           |  c1 INT COMMENT 'bla',
+           |  c2 STRING
+           |)
+           |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+           |COLLECTION ITEMS TERMINATED BY '@'
+           |MAP KEYS TERMINATED BY '#'
+           |NULL DEFINED AS 'NaN'
+         """.stripMargin
+      )
+
+      checkCreateTable("t1")
+    }
+  }
+
+  test("hive table with STORED AS clause") {
+    withTable("t1") {
+      sql(
+        s"""CREATE TABLE t1 (
+           |  c1 INT COMMENT 'bla',
+           |  c2 STRING
+           |)
+           |STORED AS PARQUET
+         """.stripMargin
+      )
+
+      checkCreateTable("t1")
+    }
+  }
+
+  test("hive table with serde info") {
+    withTable("t1") {
+      sql(
+        s"""CREATE TABLE t1 (
+           |  c1 INT COMMENT 'bla',
+           |  c2 STRING
+           |)
+           |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+           |WITH SERDEPROPERTIES (
+           |  'mapkey.delim' = ',',
+           |  'field.delim' = ','
+           |)
+           |STORED AS
+           |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+           |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+         """.stripMargin
+      )
+
+      checkCreateTable("t1")
+    }
+  }
+
+  test("hive view") {
+    withView("v1") {
+      sql("CREATE VIEW v1 AS SELECT 1 AS a")
+      checkCreateView("v1")
+    }
+  }
+
+  test("hive view with output columns") {
+    withView("v1") {
+      sql("CREATE VIEW v1 (b) AS SELECT 1 AS a")
+      checkCreateView("v1")
+    }
+  }
+
+  test("hive bucketing not supported") {
+    withTable("t1") {
+      createRawHiveTable(
+        s"""CREATE TABLE t1 (a INT, b STRING)
+           |CLUSTERED BY (a)
+           |SORTED BY (b)
+           |INTO 2 BUCKETS
+         """.stripMargin
+      )
+
+      intercept[UnsupportedOperationException] {
+        sql("SHOW CREATE TABLE t1")
+      }
+    }
+  }
+
+  private def createRawHiveTable(ddl: String): Unit = {
+    hiveContext.sharedState.metadataHive.runSqlHive(ddl)
+  }
+
   private def checkCreateTable(table: String): Unit = {
-    checkCreateTable(TableIdentifier(table, Some("default")))
+    checkCreateTableOrView(TableIdentifier(table, Some("default")), "TABLE")
+  }
+
+  private def checkCreateView(table: String): Unit = {
+    checkCreateTableOrView(TableIdentifier(table, Some("default")), "VIEW")
   }
 
-  private def checkCreateTable(table: TableIdentifier): Unit = {
+  private def checkCreateTableOrView(table: TableIdentifier, checkType: 
String): Unit = {
     val db = table.database.getOrElse("default")
     val expected = spark.externalCatalog.getTable(db, table.table)
     val shownDDL = sql(s"SHOW CREATE TABLE 
${table.quotedString}").head().getString(0)
-    sql(s"DROP TABLE ${table.quotedString}")
+    sql(s"DROP $checkType ${table.quotedString}")
 
-    withTable(table.table) {
+    try {
       sql(shownDDL)
       val actual = spark.externalCatalog.getTable(db, table.table)
       checkCatalogTables(expected, actual)
+    } finally {
+      sql(s"DROP $checkType IF EXISTS ${table.table}")
     }
   }
 
@@ -155,15 +308,31 @@ class ShowCreateTableSuite extends QueryTest with 
SQLTestUtils with TestHiveSing
         "totalSize",
         "totalNumberFiles",
         "maxFileSize",
-        "minFileSize"
+        "minFileSize",
+        // EXTERNAL is not non-deterministic, but it is filtered out for 
external tables.
+        "EXTERNAL"
       )
 
       table.copy(
         createTime = 0L,
         lastAccessTime = 0L,
-        properties = 
table.properties.filterKeys(!nondeterministicProps.contains(_)))
+        properties = 
table.properties.filterKeys(!nondeterministicProps.contains(_)),
+        // View texts are checked separately
+        viewOriginalText = None,
+        viewText = None
+      )
+    }
+
+    // Normalizes attributes auto-generated by Spark SQL for views
+    def normalizeGeneratedAttributes(str: String): String = {
+      str.replaceAll("gen_attr_[0-9]+", "gen_attr_0")
+    }
+
+    // We use expanded canonical view text as original view text of the new 
table
+    assertResult(expected.viewText.map(normalizeGeneratedAttributes)) {
+      actual.viewOriginalText.map(normalizeGeneratedAttributes)
     }
 
-    assert(normalize(expected) == normalize(actual))
+    assert(normalize(actual) == normalize(expected))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to