This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f0f35c8b1c8f [SPARK-45789][SQL] Support DESCRIBE TABLE for clustering columns f0f35c8b1c8f is described below commit f0f35c8b1c8f3b1d7f7c2b79945eb29ffb3c8f9a Author: Terry Kim <yumin...@gmail.com> AuthorDate: Tue Feb 20 14:49:49 2024 +0800 [SPARK-45789][SQL] Support DESCRIBE TABLE for clustering columns ### What changes were proposed in this pull request? This PR proposes to add clustering column info as the output of `DESCRIBE TABLE`. ### Why are the changes needed? Currently, it's not easy to retrieve clustering column info; you can do it via catalog APIs. ### Does this PR introduce _any_ user-facing change? Yes. Now, when you run `DESCRIBE TABLE` on clustered tables, you will see the "Clustering Information" as follows: ``` CREATE TABLE tbl (col1 STRING, col2 INT) using parquet CLUSTER BY (col1, col2); DESC tbl; +------------------------+---------+-------+ |col_name |data_type|comment| +------------------------+---------+-------+ |col1 |string |NULL | |col2 |int |NULL | |# Clustering Information| | | |# col_name |data_type|comment| |col1 |string |NULL | |col2 |int |NULL | +------------------------+---------+-------+ ``` ### How was this patch tested? Added new unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45077 from imback82/describe_clustered_table. Authored-by: Terry Kim <yumin...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/catalog/interface.scala | 8 ++++- .../spark/sql/execution/command/tables.scala | 24 +++++++++++++++ .../datasources/v2/DescribeTableExec.scala | 36 ++++++++++++++++++++-- .../execution/command/DescribeTableSuiteBase.scala | 21 +++++++++++++ 4 files changed, 85 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 0a1a40a88522..10428877ba8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} +import org.apache.spark.sql.connector.expressions.{ClusterByTransform, FieldReference, NamedReference, Transform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -224,6 +224,12 @@ object ClusterBySpec { ClusterBySpec(normalizedColumns) } + + def extractClusterBySpec(transforms: Seq[Transform]): Option[ClusterBySpec] = { + transforms.collectFirst { + case ClusterByTransform(columnNames) => ClusterBySpec(columnNames) + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 2f8fca7cfd73..fa288fd94ea9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -642,6 +642,7 @@ case class DescribeTableCommand( } describePartitionInfo(metadata, result) + describeClusteringInfo(metadata, result) if (partitionSpec.nonEmpty) { // Outputs the partition-specific info for the DDL command: @@ -667,6 +668,29 @@ case class DescribeTableCommand( } } + private def describeClusteringInfo( + table: CatalogTable, + buffer: ArrayBuffer[Row]): Unit = { + table.clusterBySpec.foreach { clusterBySpec => + append(buffer, "# Clustering Information", "", "") + append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) + clusterBySpec.columnNames.map { fieldNames => + val nestedField = table.schema.findNestedField(fieldNames.fieldNames.toIndexedSeq) + assert(nestedField.isDefined, + "The clustering column " + + s"${fieldNames.fieldNames.map(quoteIfNeeded).mkString(".")} " + + s"was not found in the table schema ${table.schema.catalogString}.") + nestedField.get + }.map { case (path, field) => + append( + buffer, + (path :+ field.name).map(quoteIfNeeded).mkString("."), + field.dataType.simpleString, + field.getComment().orNull) + } + } + } + private def describeFormattedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { // The following information has been already shown in the previous outputs val excludedTableInfo = Seq( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index a225dffb075b..7f7f280d8cdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -21,11 +21,11 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, ClusterBySpec} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, ResolveDefaultColumns} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, SupportsRead, Table, TableCatalog} -import org.apache.spark.sql.connector.expressions.IdentityTransform +import org.apache.spark.sql.connector.expressions.{ClusterByTransform, IdentityTransform} import org.apache.spark.sql.connector.read.SupportsReportStatistics import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ArrayImplicits._ @@ -38,6 +38,7 @@ case class DescribeTableExec( val rows = new ArrayBuffer[InternalRow]() addSchema(rows) addPartitioning(rows) + addClustering(rows) if (isExtended) { addMetadataColumns(rows) @@ -99,6 +100,32 @@ case class DescribeTableExec( case _ => } + private def addClusteringToRows( + clusterBySpec: ClusterBySpec, + rows: ArrayBuffer[InternalRow]): Unit = { + rows += toCatalystRow("# Clustering Information", "", "") + rows += toCatalystRow(s"# ${output.head.name}", output(1).name, output(2).name) + rows ++= clusterBySpec.columnNames.map { fieldNames => + val nestedField = table.schema.findNestedField(fieldNames.fieldNames.toIndexedSeq) + assert(nestedField.isDefined, + "The clustering column " + + s"${fieldNames.fieldNames.map(quoteIfNeeded).mkString(".")} " + + s"was not found in the table schema ${table.schema.catalogString}.") + nestedField.get + }.map { case (path, field) => + toCatalystRow( + (path :+ field.name).map(quoteIfNeeded).mkString("."), + field.dataType.simpleString, + field.getComment().orNull) + } + } + + private def addClustering(rows: ArrayBuffer[InternalRow]): Unit = { + ClusterBySpec.extractClusterBySpec(table.partitioning.toIndexedSeq).foreach { clusterBySpec => + addClusteringToRows(clusterBySpec, rows) + } + } + private def addTableStats(rows: ArrayBuffer[InternalRow]): Unit = table match { case read: SupportsRead => read.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match { @@ -117,7 +144,10 @@ case class DescribeTableExec( } private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = { - if (table.partitioning.nonEmpty) { + // Clustering columns are handled in addClustering(). + val partitioning = table.partitioning + .filter(t => !t.isInstanceOf[ClusterByTransform]) + if (partitioning.nonEmpty) { val partitionColumnsOnly = table.partitioning.forall(t => t.isInstanceOf[IdentityTransform]) if (partitionColumnsOnly) { rows += toCatalystRow("# Partition Information", "", "") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala index 4a7d5551fe52..c80cedca29f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala @@ -178,4 +178,25 @@ trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils { assert(errMsg === "DESC TABLE COLUMN does not support nested column: col.x.") } } + + test("describe a clustered table") { + withNamespaceAndTable("ns", "tbl") { tbl => + sql(s"CREATE TABLE $tbl (col1 STRING COMMENT 'this is comment', col2 struct<x:int, y:int>) " + + s"$defaultUsing CLUSTER BY (col1, col2.x)") + val descriptionDf = sql(s"DESC $tbl") + assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( + ("col_name", StringType), + ("data_type", StringType), + ("comment", StringType))) + QueryTest.checkAnswer( + descriptionDf, + Seq( + Row("col1", "string", "this is comment"), + Row("col2", "struct<x:int,y:int>", null), + Row("# Clustering Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("col1", "string", "this is comment"), + Row("col2.x", "int", null))) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org