This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f0f35c8b1c8f [SPARK-45789][SQL] Support DESCRIBE TABLE for clustering 
columns
f0f35c8b1c8f is described below

commit f0f35c8b1c8f3b1d7f7c2b79945eb29ffb3c8f9a
Author: Terry Kim <yumin...@gmail.com>
AuthorDate: Tue Feb 20 14:49:49 2024 +0800

    [SPARK-45789][SQL] Support DESCRIBE TABLE for clustering columns
    
    ### What changes were proposed in this pull request?
    This PR proposes to add clustering column info as the output of `DESCRIBE 
TABLE`.
    
    ### Why are the changes needed?
    
    Currently, it's not easy to retrieve clustering column info; you can do it 
via catalog APIs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Now, when you run `DESCRIBE TABLE` on clustered tables, you will see 
the "Clustering Information" as follows:
    
    ```
    CREATE TABLE tbl (col1 STRING, col2 INT) using parquet CLUSTER BY (col1, 
col2);
    DESC tbl;
    
    +------------------------+---------+-------+
    |col_name                |data_type|comment|
    +------------------------+---------+-------+
    |col1                    |string   |NULL   |
    |col2                    |int      |NULL   |
    |# Clustering Information|         |       |
    |# col_name              |data_type|comment|
    |col1                    |string   |NULL   |
    |col2                    |int      |NULL   |
    +------------------------+---------+-------+
    ```
    
    ### How was this patch tested?
    
    Added new unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45077 from imback82/describe_clustered_table.
    
    Authored-by: Terry Kim <yumin...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/catalog/interface.scala     |  8 ++++-
 .../spark/sql/execution/command/tables.scala       | 24 +++++++++++++++
 .../datasources/v2/DescribeTableExec.scala         | 36 ++++++++++++++++++++--
 .../execution/command/DescribeTableSuiteBase.scala | 21 +++++++++++++
 4 files changed, 85 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 0a1a40a88522..10428877ba8d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -43,7 +43,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
+import org.apache.spark.sql.connector.expressions.{ClusterByTransform, 
FieldReference, NamedReference, Transform}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -224,6 +224,12 @@ object ClusterBySpec {
 
     ClusterBySpec(normalizedColumns)
   }
+
+  def extractClusterBySpec(transforms: Seq[Transform]): Option[ClusterBySpec] 
= {
+    transforms.collectFirst {
+      case ClusterByTransform(columnNames) => ClusterBySpec(columnNames)
+    }
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 2f8fca7cfd73..fa288fd94ea9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -642,6 +642,7 @@ case class DescribeTableCommand(
       }
 
       describePartitionInfo(metadata, result)
+      describeClusteringInfo(metadata, result)
 
       if (partitionSpec.nonEmpty) {
         // Outputs the partition-specific info for the DDL command:
@@ -667,6 +668,29 @@ case class DescribeTableCommand(
     }
   }
 
+  private def describeClusteringInfo(
+      table: CatalogTable,
+      buffer: ArrayBuffer[Row]): Unit = {
+    table.clusterBySpec.foreach { clusterBySpec =>
+      append(buffer, "# Clustering Information", "", "")
+      append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
+      clusterBySpec.columnNames.map { fieldNames =>
+        val nestedField = 
table.schema.findNestedField(fieldNames.fieldNames.toIndexedSeq)
+        assert(nestedField.isDefined,
+          "The clustering column " +
+            s"${fieldNames.fieldNames.map(quoteIfNeeded).mkString(".")} " +
+            s"was not found in the table schema 
${table.schema.catalogString}.")
+        nestedField.get
+      }.map { case (path, field) =>
+        append(
+          buffer,
+          (path :+ field.name).map(quoteIfNeeded).mkString("."),
+          field.dataType.simpleString,
+          field.getComment().orNull)
+      }
+    }
+  }
+
   private def describeFormattedTableInfo(table: CatalogTable, buffer: 
ArrayBuffer[Row]): Unit = {
     // The following information has been already shown in the previous outputs
     val excludedTableInfo = Seq(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
index a225dffb075b..7f7f280d8cdc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
@@ -21,11 +21,11 @@ import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, ClusterBySpec}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, 
ResolveDefaultColumns}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
SupportsMetadataColumns, SupportsRead, Table, TableCatalog}
-import org.apache.spark.sql.connector.expressions.IdentityTransform
+import org.apache.spark.sql.connector.expressions.{ClusterByTransform, 
IdentityTransform}
 import org.apache.spark.sql.connector.read.SupportsReportStatistics
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
@@ -38,6 +38,7 @@ case class DescribeTableExec(
     val rows = new ArrayBuffer[InternalRow]()
     addSchema(rows)
     addPartitioning(rows)
+    addClustering(rows)
 
     if (isExtended) {
       addMetadataColumns(rows)
@@ -99,6 +100,32 @@ case class DescribeTableExec(
     case _ =>
   }
 
+  private def addClusteringToRows(
+      clusterBySpec: ClusterBySpec,
+      rows: ArrayBuffer[InternalRow]): Unit = {
+    rows += toCatalystRow("# Clustering Information", "", "")
+    rows += toCatalystRow(s"# ${output.head.name}", output(1).name, 
output(2).name)
+    rows ++= clusterBySpec.columnNames.map { fieldNames =>
+      val nestedField = 
table.schema.findNestedField(fieldNames.fieldNames.toIndexedSeq)
+      assert(nestedField.isDefined,
+        "The clustering column " +
+          s"${fieldNames.fieldNames.map(quoteIfNeeded).mkString(".")} " +
+          s"was not found in the table schema ${table.schema.catalogString}.")
+      nestedField.get
+    }.map { case (path, field) =>
+      toCatalystRow(
+        (path :+ field.name).map(quoteIfNeeded).mkString("."),
+        field.dataType.simpleString,
+        field.getComment().orNull)
+    }
+  }
+
+  private def addClustering(rows: ArrayBuffer[InternalRow]): Unit = {
+    
ClusterBySpec.extractClusterBySpec(table.partitioning.toIndexedSeq).foreach { 
clusterBySpec =>
+      addClusteringToRows(clusterBySpec, rows)
+    }
+  }
+
   private def addTableStats(rows: ArrayBuffer[InternalRow]): Unit = table 
match {
     case read: SupportsRead =>
       read.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match {
@@ -117,7 +144,10 @@ case class DescribeTableExec(
   }
 
   private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = {
-    if (table.partitioning.nonEmpty) {
+    // Clustering columns are handled in addClustering().
+    val partitioning = table.partitioning
+      .filter(t => !t.isInstanceOf[ClusterByTransform])
+    if (partitioning.nonEmpty) {
       val partitionColumnsOnly = table.partitioning.forall(t => 
t.isInstanceOf[IdentityTransform])
       if (partitionColumnsOnly) {
         rows += toCatalystRow("# Partition Information", "", "")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
index 4a7d5551fe52..c80cedca29f7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
@@ -178,4 +178,25 @@ trait DescribeTableSuiteBase extends QueryTest with 
DDLCommandTestUtils {
       assert(errMsg === "DESC TABLE COLUMN does not support nested column: 
col.x.")
     }
   }
+
+  test("describe a clustered table") {
+    withNamespaceAndTable("ns", "tbl") { tbl =>
+      sql(s"CREATE TABLE $tbl (col1 STRING COMMENT 'this is comment', col2 
struct<x:int, y:int>) " +
+        s"$defaultUsing CLUSTER BY (col1, col2.x)")
+      val descriptionDf = sql(s"DESC $tbl")
+      assert(descriptionDf.schema.map(field => (field.name, field.dataType)) 
=== Seq(
+        ("col_name", StringType),
+        ("data_type", StringType),
+        ("comment", StringType)))
+      QueryTest.checkAnswer(
+        descriptionDf,
+        Seq(
+          Row("col1", "string", "this is comment"),
+          Row("col2", "struct<x:int,y:int>", null),
+          Row("# Clustering Information", "", ""),
+          Row("# col_name", "data_type", "comment"),
+          Row("col1", "string", "this is comment"),
+          Row("col2.x", "int", null)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to