Repository: spark Updated Branches: refs/heads/master 957558235 -> 515910e9b
[SPARK-17642][SQL] support DESC EXTENDED/FORMATTED table column commands ## What changes were proposed in this pull request? Support DESC (EXTENDED | FORMATTED) ? TABLE COLUMN command. Support DESC EXTENDED | FORMATTED TABLE COLUMN command to show column-level statistics. Do NOT support describe nested columns. ## How was this patch tested? Added test cases. Author: Zhenhua Wang <wzh_...@163.com> Author: Zhenhua Wang <wangzhen...@huawei.com> Author: wangzhenhua <wangzhen...@huawei.com> Closes #16422 from wzhfy/descColumn. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/515910e9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/515910e9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/515910e9 Branch: refs/heads/master Commit: 515910e9bdc1d1b7f0fc05cadc6aeb3a58860e2d Parents: 9575582 Author: Zhenhua Wang <wzh_...@163.com> Authored: Tue Sep 12 08:59:52 2017 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Tue Sep 12 08:59:52 2017 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../spark/sql/execution/SparkSqlParser.scala | 14 +- .../spark/sql/execution/command/tables.scala | 72 +++++++- .../sql-tests/inputs/describe-table-column.sql | 35 ++++ .../results/describe-table-column.sql.out | 184 +++++++++++++++++++ .../apache/spark/sql/SQLQueryTestSuite.scala | 10 +- .../sql/execution/SparkSqlParserSuite.scala | 28 ++- 7 files changed, 332 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 239e73e..33bc79a 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -270,7 +270,7 @@ describeFuncName ; describeColName - : identifier ('.' (identifier | STRING))* + : nameParts+=identifier ('.' nameParts+=identifier)* ; ctes http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d38919b..6de9ea0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -330,10 +330,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * Create a [[DescribeTableCommand]] logical plan. */ override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) { - // Describe column are not supported yet. Return null and let the parser decide - // what to do with this (create an exception or pass it on to a different system). + val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null if (ctx.describeColName != null) { - null + if (ctx.partitionSpec != null) { + throw new ParseException("DESC TABLE COLUMN for a specific partition is not supported", ctx) + } else { + DescribeColumnCommand( + visitTableIdentifier(ctx.tableIdentifier), + ctx.describeColName.nameParts.asScala.map(_.getText), + isExtended) + } } else { val partitionSpec = if (ctx.partitionSpec != null) { // According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`. @@ -348,7 +354,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { DescribeTableCommand( visitTableIdentifier(ctx.tableIdentifier), partitionSpec, - ctx.EXTENDED != null || ctx.FORMATTED != null) + isExtended) } } http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 1dddc1c..da0c815 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -29,13 +29,13 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -631,6 +631,74 @@ case class DescribeTableCommand( } } +/** + * A command to list the info for a column, including name, data type, column stats and comment. + * This function creates a [[DescribeColumnCommand]] logical plan. + * + * The syntax of using this command in SQL is: + * {{{ + * DESCRIBE [EXTENDED|FORMATTED] table_name column_name; + * }}} + */ +case class DescribeColumnCommand( + table: TableIdentifier, + colNameParts: Seq[String], + isExtended: Boolean) + extends RunnableCommand { + + override val output: Seq[Attribute] = { + Seq( + AttributeReference("info_name", StringType, nullable = false, + new MetadataBuilder().putString("comment", "name of the column info").build())(), + AttributeReference("info_value", StringType, nullable = false, + new MetadataBuilder().putString("comment", "value of the column info").build())() + ) + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val resolver = sparkSession.sessionState.conf.resolver + val relation = sparkSession.table(table).queryExecution.analyzed + + val colName = UnresolvedAttribute(colNameParts).name + val field = { + relation.resolve(colNameParts, resolver).getOrElse { + throw new AnalysisException(s"Column $colName does not exist") + } + } + if (!field.isInstanceOf[Attribute]) { + // If the field is not an attribute after `resolve`, then it's a nested field. + throw new AnalysisException( + s"DESC TABLE COLUMN command does not support nested data types: $colName") + } + + val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) + val colStats = catalogTable.stats.map(_.colStats).getOrElse(Map.empty) + val cs = colStats.get(field.name) + + val comment = if (field.metadata.contains("comment")) { + Option(field.metadata.getString("comment")) + } else { + None + } + + val buffer = ArrayBuffer[Row]( + Row("col_name", field.name), + Row("data_type", field.dataType.catalogString), + Row("comment", comment.getOrElse("NULL")) + ) + if (isExtended) { + // Show column stats when EXTENDED or FORMATTED is specified. + buffer += Row("min", cs.flatMap(_.min.map(_.toString)).getOrElse("NULL")) + buffer += Row("max", cs.flatMap(_.max.map(_.toString)).getOrElse("NULL")) + buffer += Row("num_nulls", cs.map(_.nullCount.toString).getOrElse("NULL")) + buffer += Row("distinct_count", cs.map(_.distinctCount.toString).getOrElse("NULL")) + buffer += Row("avg_col_len", cs.map(_.avgLen.toString).getOrElse("NULL")) + buffer += Row("max_col_len", cs.map(_.maxLen.toString).getOrElse("NULL")) + } + buffer + } +} /** * A command for users to get tables in the given database. http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql b/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql new file mode 100644 index 0000000..24870de --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql @@ -0,0 +1,35 @@ +-- Test temp table +CREATE TEMPORARY VIEW desc_col_temp_table (key int COMMENT 'column_comment') USING PARQUET; + +DESC desc_col_temp_table key; + +DESC EXTENDED desc_col_temp_table key; + +DESC FORMATTED desc_col_temp_table key; + +-- Describe a column with qualified name +DESC FORMATTED desc_col_temp_table desc_col_temp_table.key; + +-- Describe a non-existent column +DESC desc_col_temp_table key1; + +-- Test persistent table +CREATE TABLE desc_col_table (key int COMMENT 'column_comment') USING PARQUET; + +ANALYZE TABLE desc_col_table COMPUTE STATISTICS FOR COLUMNS key; + +DESC desc_col_table key; + +DESC EXTENDED desc_col_table key; + +DESC FORMATTED desc_col_table key; + +-- Test complex columns +CREATE TABLE desc_col_complex_table (`a.b` int, col struct<x:int, y:string>) USING PARQUET; + +DESC FORMATTED desc_col_complex_table `a.b`; + +DESC FORMATTED desc_col_complex_table col; + +-- Describe a nested column +DESC FORMATTED desc_col_complex_table col.x; http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out new file mode 100644 index 0000000..a51eef7 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out @@ -0,0 +1,184 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 15 + + +-- !query 0 +CREATE TEMPORARY VIEW desc_col_temp_table (key int COMMENT 'column_comment') USING PARQUET +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +DESC desc_col_temp_table key +-- !query 1 schema +struct<info_name:string,info_value:string> +-- !query 1 output +col_name key +data_type int +comment column_comment + + +-- !query 2 +DESC EXTENDED desc_col_temp_table key +-- !query 2 schema +struct<info_name:string,info_value:string> +-- !query 2 output +col_name key +data_type int +comment column_comment +min NULL +max NULL +num_nulls NULL +distinct_count NULL +avg_col_len NULL +max_col_len NULL + + +-- !query 3 +DESC FORMATTED desc_col_temp_table key +-- !query 3 schema +struct<info_name:string,info_value:string> +-- !query 3 output +col_name key +data_type int +comment column_comment +min NULL +max NULL +num_nulls NULL +distinct_count NULL +avg_col_len NULL +max_col_len NULL + + +-- !query 4 +DESC FORMATTED desc_col_temp_table desc_col_temp_table.key +-- !query 4 schema +struct<info_name:string,info_value:string> +-- !query 4 output +col_name key +data_type int +comment column_comment +min NULL +max NULL +num_nulls NULL +distinct_count NULL +avg_col_len NULL +max_col_len NULL + + +-- !query 5 +DESC desc_col_temp_table key1 +-- !query 5 schema +struct<> +-- !query 5 output +org.apache.spark.sql.AnalysisException +Column key1 does not exist; + + +-- !query 6 +CREATE TABLE desc_col_table (key int COMMENT 'column_comment') USING PARQUET +-- !query 6 schema +struct<> +-- !query 6 output + + + +-- !query 7 +ANALYZE TABLE desc_col_table COMPUTE STATISTICS FOR COLUMNS key +-- !query 7 schema +struct<> +-- !query 7 output + + + +-- !query 8 +DESC desc_col_table key +-- !query 8 schema +struct<info_name:string,info_value:string> +-- !query 8 output +col_name key +data_type int +comment column_comment + + +-- !query 9 +DESC EXTENDED desc_col_table key +-- !query 9 schema +struct<info_name:string,info_value:string> +-- !query 9 output +col_name key +data_type int +comment column_comment +min NULL +max NULL +num_nulls 0 +distinct_count 0 +avg_col_len 4 +max_col_len 4 + + +-- !query 10 +DESC FORMATTED desc_col_table key +-- !query 10 schema +struct<info_name:string,info_value:string> +-- !query 10 output +col_name key +data_type int +comment column_comment +min NULL +max NULL +num_nulls 0 +distinct_count 0 +avg_col_len 4 +max_col_len 4 + + +-- !query 11 +CREATE TABLE desc_col_complex_table (`a.b` int, col struct<x:int, y:string>) USING PARQUET +-- !query 11 schema +struct<> +-- !query 11 output + + + +-- !query 12 +DESC FORMATTED desc_col_complex_table `a.b` +-- !query 12 schema +struct<info_name:string,info_value:string> +-- !query 12 output +col_name a.b +data_type int +comment NULL +min NULL +max NULL +num_nulls NULL +distinct_count NULL +avg_col_len NULL +max_col_len NULL + + +-- !query 13 +DESC FORMATTED desc_col_complex_table col +-- !query 13 schema +struct<info_name:string,info_value:string> +-- !query 13 output +col_name col +data_type struct<x:int,y:string> +comment NULL +min NULL +max NULL +num_nulls NULL +distinct_count NULL +avg_col_len NULL +max_col_len NULL + + +-- !query 14 +DESC FORMATTED desc_col_complex_table col.x +-- !query 14 schema +struct<> +-- !query 14 output +org.apache.spark.sql.AnalysisException +DESC TABLE COLUMN command does not support nested data types: col.x; http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index aa000bd..e3901af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile} -import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeTableCommand} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.StructType @@ -214,11 +214,11 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { /** Executes a query and returns the result as (schema of the output, normalized output). */ private def getNormalizedResult(session: SparkSession, sql: String): (StructType, Seq[String]) = { // Returns true if the plan is supposed to be sorted. - def needSort(plan: LogicalPlan): Boolean = plan match { + def isSorted(plan: LogicalPlan): Boolean = plan match { case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false - case _: DescribeTableCommand => true + case _: DescribeTableCommand | _: DescribeColumnCommand => true case PhysicalOperation(_, _, Sort(_, true, _)) => true - case _ => plan.children.iterator.exists(needSort) + case _ => plan.children.iterator.exists(isSorted) } try { @@ -233,7 +233,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { .replaceAll("Last Access.*", s"Last Access $notIncludedMsg")) // If the output is not pre-sorted, sort it. - if (needSort(df.queryExecution.analyzed)) (schema, answer) else (schema, answer.sorted) + if (isSorted(df.queryExecution.analyzed)) (schema, answer) else (schema, answer.sorted) } catch { case a: AnalysisException => http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index fa7a866..107a2f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -249,8 +249,34 @@ class SparkSqlParserSuite extends AnalysisTest { assertEqual("describe table formatted t", DescribeTableCommand( TableIdentifier("t"), Map.empty, isExtended = true)) + } + + test("describe table column") { + assertEqual("DESCRIBE t col", + DescribeColumnCommand( + TableIdentifier("t"), Seq("col"), isExtended = false)) + assertEqual("DESCRIBE t `abc.xyz`", + DescribeColumnCommand( + TableIdentifier("t"), Seq("abc.xyz"), isExtended = false)) + assertEqual("DESCRIBE t abc.xyz", + DescribeColumnCommand( + TableIdentifier("t"), Seq("abc", "xyz"), isExtended = false)) + assertEqual("DESCRIBE t `a.b`.`x.y`", + DescribeColumnCommand( + TableIdentifier("t"), Seq("a.b", "x.y"), isExtended = false)) + + assertEqual("DESCRIBE TABLE t col", + DescribeColumnCommand( + TableIdentifier("t"), Seq("col"), isExtended = false)) + assertEqual("DESCRIBE TABLE EXTENDED t col", + DescribeColumnCommand( + TableIdentifier("t"), Seq("col"), isExtended = true)) + assertEqual("DESCRIBE TABLE FORMATTED t col", + DescribeColumnCommand( + TableIdentifier("t"), Seq("col"), isExtended = true)) - intercept("explain describe tables x", "Unsupported SQL statement") + intercept("DESCRIBE TABLE t PARTITION (ds='1970-01-01') col", + "DESC TABLE COLUMN for a specific partition is not supported") } test("analyze table statistics") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org