Repository: spark Updated Branches: refs/heads/branch-2.0 7b62b7c11 -> 2dddec40d
[SPARK-14346][SQL] Native SHOW CREATE TABLE for Hive tables/views ## What changes were proposed in this pull request? This is a follow-up of #12781. It adds native `SHOW CREATE TABLE` support for Hive tables and views. A new field `hasUnsupportedFeatures` is added to `CatalogTable` to indicate whether all table metadata retrieved from the concrete underlying external catalog (i.e. Hive metastore in this case) can be mapped to fields in `CatalogTable`. This flag is useful when the target Hive table contains structures that can't be handled by Spark SQL, e.g., skewed columns and storage handler, etc.. ## How was this patch tested? New test cases are added in `ShowCreateTableSuite` to do round-trip tests. Author: Cheng Lian <l...@databricks.com> Closes #13079 from liancheng/spark-14346-show-create-table-for-hive-tables. (cherry picked from commit b674e67c22bf663334e537e35787c00533adbb04) Signed-off-by: Yin Huai <yh...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2dddec40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2dddec40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2dddec40 Branch: refs/heads/branch-2.0 Commit: 2dddec40d6562d1d16bb242bf7dc730431ee1e3e Parents: 7b62b7c Author: Cheng Lian <l...@databricks.com> Authored: Tue May 17 15:56:44 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Tue May 17 15:56:57 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/catalog/interface.scala | 12 +- .../spark/sql/execution/command/tables.scala | 184 +++++++++++++----- .../spark/sql/hive/client/HiveClientImpl.scala | 10 +- .../spark/sql/hive/ShowCreateTableSuite.scala | 185 ++++++++++++++++++- 4 files changed, 333 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2dddec40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index d215655..d4f5cbb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -79,6 +79,12 @@ case class CatalogTablePartition( * * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the * future once we have a better understanding of how we want to handle skewed columns. + * + * @param hasUnsupportedFeatures is used to indicate whether all table metadata entries retrieved + * from the concrete underlying external catalog (e.g. Hive metastore) are supported by + * Spark SQL. For example, if the underlying Hive table has skewed columns, this information + * can't be mapped to [[CatalogTable]] since Spark SQL doesn't handle skewed columns for now. + * In this case `hasUnsupportedFeatures` is set to true. By default, it is false. */ case class CatalogTable( identifier: TableIdentifier, @@ -95,7 +101,8 @@ case class CatalogTable( properties: Map[String, String] = Map.empty, viewOriginalText: Option[String] = None, viewText: Option[String] = None, - comment: Option[String] = None) { + comment: Option[String] = None, + hasUnsupportedFeatures: Boolean = false) { // Verify that the provided columns are part of the schema private val colNames = schema.map(_.name).toSet @@ -200,6 +207,7 @@ case class SimpleCatalogRelation( } } - require(metadata.identifier.database == Some(databaseName), + require( + metadata.identifier.database.contains(databaseName), "provided database does not match the one specified in the table definition") } http://git-wip-us.apache.org/repos/asf/spark/blob/2dddec40/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index bb4f1ff..1fc02d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -626,40 +626,149 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { showCreateDataSourceTable(tableMetadata) } else { - throw new UnsupportedOperationException( - "SHOW CREATE TABLE only supports Spark SQL data source tables.") + showCreateHiveTable(tableMetadata) } Seq(Row(stmt)) } + private def showCreateHiveTable(metadata: CatalogTable): String = { + def reportUnsupportedError(): Unit = { + throw new UnsupportedOperationException( + s"Failed to execute SHOW CREATE TABLE against table ${metadata.identifier.quotedString}, " + + "because it contains table structure(s) (e.g. skewed columns) that Spark SQL doesn't " + + "support yet." + ) + } + + if (metadata.hasUnsupportedFeatures) { + reportUnsupportedError() + } + + val builder = StringBuilder.newBuilder + + val tableTypeString = metadata.tableType match { + case EXTERNAL => " EXTERNAL TABLE" + case VIEW => " VIEW" + case MANAGED => " TABLE" + case INDEX => reportUnsupportedError() + } + + builder ++= s"CREATE$tableTypeString ${table.quotedString}" + + if (metadata.tableType == VIEW) { + if (metadata.schema.nonEmpty) { + builder ++= metadata.schema.map(_.name).mkString("(", ", ", ")") + } + builder ++= metadata.viewText.mkString(" AS\n", "", "\n") + } else { + showHiveTableHeader(metadata, builder) + showHiveTableNonDataColumns(metadata, builder) + showHiveTableStorageInfo(metadata, builder) + showHiveTableProperties(metadata, builder) + } + + builder.toString() + } + + private def showHiveTableHeader(metadata: CatalogTable, builder: StringBuilder): Unit = { + val columns = metadata.schema.filterNot { column => + metadata.partitionColumnNames.contains(column.name) + }.map(columnToDDLFragment) + + if (columns.nonEmpty) { + builder ++= columns.mkString("(", ", ", ")\n") + } + + metadata + .comment + .map("COMMENT '" + escapeSingleQuotedString(_) + "'\n") + .foreach(builder.append) + } + + private def columnToDDLFragment(column: CatalogColumn): String = { + val comment = column.comment.map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'") + s"${quoteIdentifier(column.name)} ${column.dataType}${comment.getOrElse("")}" + } + + private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: StringBuilder): Unit = { + if (metadata.partitionColumns.nonEmpty) { + val partCols = metadata.partitionColumns.map(columnToDDLFragment) + builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n") + } + + if (metadata.bucketColumnNames.nonEmpty) { + throw new UnsupportedOperationException( + "Creating Hive table with bucket spec is not supported yet.") + } + } + + private def showHiveTableStorageInfo(metadata: CatalogTable, builder: StringBuilder): Unit = { + val storage = metadata.storage + + storage.serde.foreach { serde => + builder ++= s"ROW FORMAT SERDE '$serde'\n" + + val serdeProps = metadata.storage.serdeProperties.map { + case (key, value) => + s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'" + } + + builder ++= serdeProps.mkString("WITH SERDEPROPERTIES (", ",\n ", "\n)\n") + } + + if (storage.inputFormat.isDefined || storage.outputFormat.isDefined) { + builder ++= "STORED AS\n" + + storage.inputFormat.foreach { format => + builder ++= s" INPUTFORMAT '${escapeSingleQuotedString(format)}'\n" + } + + storage.outputFormat.foreach { format => + builder ++= s" OUTPUTFORMAT '${escapeSingleQuotedString(format)}'\n" + } + } + + if (metadata.tableType == EXTERNAL) { + storage.locationUri.foreach { uri => + builder ++= s"LOCATION '$uri'\n" + } + } + } + + private def showHiveTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = { + if (metadata.properties.nonEmpty) { + val filteredProps = metadata.properties.filterNot { + // Skips "EXTERNAL" property for external tables + case (key, _) => key == "EXTERNAL" && metadata.tableType == EXTERNAL + } + + val props = filteredProps.map { case (key, value) => + s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'" + } + + if (props.nonEmpty) { + builder ++= props.mkString("TBLPROPERTIES (", ",\n ", ")\n") + } + } + } + private def showCreateDataSourceTable(metadata: CatalogTable): String = { val builder = StringBuilder.newBuilder builder ++= s"CREATE TABLE ${table.quotedString} " - showDataSourceTableDataCols(metadata, builder) + showDataSourceTableDataColumns(metadata, builder) showDataSourceTableOptions(metadata, builder) showDataSourceTableNonDataColumns(metadata, builder) builder.toString() } - private def showDataSourceTableDataCols(metadata: CatalogTable, builder: StringBuilder): Unit = { - val props = metadata.properties - val schemaParts = for { - numParts <- props.get("spark.sql.sources.schema.numParts").toSeq - index <- 0 until numParts.toInt - } yield props.getOrElse( - s"spark.sql.sources.schema.part.$index", - throw new AnalysisException( - s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing." - ) - ) - - if (schemaParts.nonEmpty) { - val fields = DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType].fields - val colTypeList = fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}") - builder ++= colTypeList.mkString("(", ", ", ")") + private def showDataSourceTableDataColumns( + metadata: CatalogTable, builder: StringBuilder): Unit = { + DDLUtils.getSchemaFromTableProperties(metadata).foreach { schema => + val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}") + builder ++= columns.mkString("(", ", ", ")") } builder ++= "\n" @@ -688,40 +797,21 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman private def showDataSourceTableNonDataColumns( metadata: CatalogTable, builder: StringBuilder): Unit = { - val props = metadata.properties - - def getColumnNamesByType(colType: String, typeName: String): Seq[String] = { - (for { - numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq - index <- 0 until numCols.toInt - } yield props.getOrElse( - s"spark.sql.sources.schema.${colType}Col.$index", - throw new AnalysisException( - s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing." - ) - )).map(quoteIdentifier) - } - - val partCols = getColumnNamesByType("part", "partitioning columns") + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(metadata) if (partCols.nonEmpty) { builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n" } - val bucketCols = getColumnNamesByType("bucket", "bucketing columns") - if (bucketCols.nonEmpty) { - builder ++= s"CLUSTERED BY ${bucketCols.mkString("(", ", ", ")")}\n" - - val sortCols = getColumnNamesByType("sort", "sorting columns") - if (sortCols.nonEmpty) { - builder ++= s"SORTED BY ${sortCols.mkString("(", ", ", ")")}\n" - } + DDLUtils.getBucketSpecFromTableProperties(metadata).foreach { spec => + if (spec.bucketColumnNames.nonEmpty) { + builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n" - val numBuckets = props.getOrElse( - "spark.sql.sources.schema.numBuckets", - throw new AnalysisException("Corrupted bucket spec in catalog: missing bucket number") - ) + if (spec.sortColumnNames.nonEmpty) { + builder ++= s"SORTED BY ${spec.sortColumnNames.mkString("(", ", ", ")")}\n" + } - builder ++= s"INTO $numBuckets BUCKETS\n" + builder ++= s"INTO ${spec.numBuckets} BUCKETS\n" + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/2dddec40/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 78c457b..a4e9f03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -339,6 +339,13 @@ private[hive] class HiveClientImpl( // partition columns are part of the schema val partCols = h.getPartCols.asScala.map(fromHiveColumn) val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols + + // Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet) + val hasUnsupportedFeatures = + !h.getSkewedColNames.isEmpty || + h.getStorageHandler != null || + !h.getBucketCols.isEmpty + CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { @@ -365,7 +372,8 @@ private[hive] class HiveClientImpl( ), properties = h.getParameters.asScala.toMap, viewOriginalText = Option(h.getViewOriginalText), - viewText = Option(h.getViewExpandedText)) + viewText = Option(h.getViewExpandedText), + hasUnsupportedFeatures = hasUnsupportedFeatures) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2dddec40/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala index 12a1ad8..3b8068d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -116,24 +116,177 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing .bucketBy(2, "c", "d") .saveAsTable("ddl_test5") - checkCreateTable(TableIdentifier("ddl_test5", Some("default"))) + checkCreateTable("ddl_test5") } } + test("simple hive table") { + withTable("t1") { + sql( + s"""CREATE TABLE t1 ( + | c1 INT COMMENT 'bla', + | c2 STRING + |) + |TBLPROPERTIES ( + | 'prop1' = 'value1', + | 'prop2' = 'value2' + |) + """.stripMargin + ) + + checkCreateTable("t1") + } + } + + test("simple external hive table") { + withTempDir { dir => + withTable("t1") { + sql( + s"""CREATE TABLE t1 ( + | c1 INT COMMENT 'bla', + | c2 STRING + |) + |LOCATION '$dir' + |TBLPROPERTIES ( + | 'prop1' = 'value1', + | 'prop2' = 'value2' + |) + """.stripMargin + ) + + checkCreateTable("t1") + } + } + } + + test("partitioned hive table") { + withTable("t1") { + sql( + s"""CREATE TABLE t1 ( + | c1 INT COMMENT 'bla', + | c2 STRING + |) + |COMMENT 'bla' + |PARTITIONED BY ( + | p1 BIGINT COMMENT 'bla', + | p2 STRING + |) + """.stripMargin + ) + + checkCreateTable("t1") + } + } + + test("hive table with explicit storage info") { + withTable("t1") { + sql( + s"""CREATE TABLE t1 ( + | c1 INT COMMENT 'bla', + | c2 STRING + |) + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + |COLLECTION ITEMS TERMINATED BY '@' + |MAP KEYS TERMINATED BY '#' + |NULL DEFINED AS 'NaN' + """.stripMargin + ) + + checkCreateTable("t1") + } + } + + test("hive table with STORED AS clause") { + withTable("t1") { + sql( + s"""CREATE TABLE t1 ( + | c1 INT COMMENT 'bla', + | c2 STRING + |) + |STORED AS PARQUET + """.stripMargin + ) + + checkCreateTable("t1") + } + } + + test("hive table with serde info") { + withTable("t1") { + sql( + s"""CREATE TABLE t1 ( + | c1 INT COMMENT 'bla', + | c2 STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |WITH SERDEPROPERTIES ( + | 'mapkey.delim' = ',', + | 'field.delim' = ',' + |) + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin + ) + + checkCreateTable("t1") + } + } + + test("hive view") { + withView("v1") { + sql("CREATE VIEW v1 AS SELECT 1 AS a") + checkCreateView("v1") + } + } + + test("hive view with output columns") { + withView("v1") { + sql("CREATE VIEW v1 (b) AS SELECT 1 AS a") + checkCreateView("v1") + } + } + + test("hive bucketing not supported") { + withTable("t1") { + createRawHiveTable( + s"""CREATE TABLE t1 (a INT, b STRING) + |CLUSTERED BY (a) + |SORTED BY (b) + |INTO 2 BUCKETS + """.stripMargin + ) + + intercept[UnsupportedOperationException] { + sql("SHOW CREATE TABLE t1") + } + } + } + + private def createRawHiveTable(ddl: String): Unit = { + hiveContext.sharedState.metadataHive.runSqlHive(ddl) + } + private def checkCreateTable(table: String): Unit = { - checkCreateTable(TableIdentifier(table, Some("default"))) + checkCreateTableOrView(TableIdentifier(table, Some("default")), "TABLE") + } + + private def checkCreateView(table: String): Unit = { + checkCreateTableOrView(TableIdentifier(table, Some("default")), "VIEW") } - private def checkCreateTable(table: TableIdentifier): Unit = { + private def checkCreateTableOrView(table: TableIdentifier, checkType: String): Unit = { val db = table.database.getOrElse("default") val expected = spark.externalCatalog.getTable(db, table.table) val shownDDL = sql(s"SHOW CREATE TABLE ${table.quotedString}").head().getString(0) - sql(s"DROP TABLE ${table.quotedString}") + sql(s"DROP $checkType ${table.quotedString}") - withTable(table.table) { + try { sql(shownDDL) val actual = spark.externalCatalog.getTable(db, table.table) checkCatalogTables(expected, actual) + } finally { + sql(s"DROP $checkType IF EXISTS ${table.table}") } } @@ -155,15 +308,31 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing "totalSize", "totalNumberFiles", "maxFileSize", - "minFileSize" + "minFileSize", + // EXTERNAL is not non-deterministic, but it is filtered out for external tables. + "EXTERNAL" ) table.copy( createTime = 0L, lastAccessTime = 0L, - properties = table.properties.filterKeys(!nondeterministicProps.contains(_))) + properties = table.properties.filterKeys(!nondeterministicProps.contains(_)), + // View texts are checked separately + viewOriginalText = None, + viewText = None + ) + } + + // Normalizes attributes auto-generated by Spark SQL for views + def normalizeGeneratedAttributes(str: String): String = { + str.replaceAll("gen_attr_[0-9]+", "gen_attr_0") + } + + // We use expanded canonical view text as original view text of the new table + assertResult(expected.viewText.map(normalizeGeneratedAttributes)) { + actual.viewOriginalText.map(normalizeGeneratedAttributes) } - assert(normalize(expected) == normalize(actual)) + assert(normalize(actual) == normalize(expected)) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org