This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ebab0ef7c85 [SPARK-39859][SQL] Support v2 DESCRIBE TABLE EXTENDED for 
columns
ebab0ef7c85 is described below

commit ebab0ef7c8572e1dac41474c5991f482dbe9d253
Author: huaxingao <huaxin_...@apple.com>
AuthorDate: Thu Feb 16 20:40:58 2023 -0800

    [SPARK-39859][SQL] Support v2 DESCRIBE TABLE EXTENDED for columns
    
    ### What changes were proposed in this pull request?
    Support v2 DESCRIBE TABLE EXTENDED for columns
    
    ### Why are the changes needed?
    DS v1/v2 command parity
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #40058 from huaxingao/describe_col.
    
    Authored-by: huaxingao <huaxin_...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../datasources/v2/DataSourceV2Strategy.scala      | 13 +++++--
 .../datasources/v2/DescribeColumnExec.scala        | 42 ++++++++++++++++++++--
 .../execution/command/v2/DescribeTableSuite.scala  | 11 ++++--
 3 files changed, 59 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 29f0da1158f..757b66e1534 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog,
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
LiteralValue}
 import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => 
V2Not, Or => V2Or, Predicate}
-import org.apache.spark.sql.connector.read.LocalScan
+import org.apache.spark.sql.connector.read.{LocalScan, 
SupportsReportStatistics}
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream}
 import org.apache.spark.sql.connector.write.V1Write
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
@@ -329,10 +329,17 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       }
       DescribeTableExec(output, r.table, isExtended) :: Nil
 
-    case DescribeColumn(_: ResolvedTable, column, isExtended, output) =>
+    case DescribeColumn(r: ResolvedTable, column, isExtended, output) =>
       column match {
         case c: Attribute =>
-          DescribeColumnExec(output, c, isExtended) :: Nil
+          val colStats =
+            
r.table.asReadable.newScanBuilder(CaseInsensitiveStringMap.empty()).build() 
match {
+            case s: SupportsReportStatistics =>
+              val stats = s.estimateStatistics()
+              Some(stats.columnStats().get(FieldReference.column(c.name)))
+            case _ => None
+          }
+          DescribeColumnExec(output, c, isExtended, colStats) :: Nil
         case nested =>
           throw QueryCompilationErrors.commandNotSupportNestedColumnError(
             "DESC TABLE COLUMN", toPrettySQL(nested))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala
index 3be9b5c5471..491c214080a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala
@@ -22,11 +22,13 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.read.colstats.ColumnStatistics
 
 case class DescribeColumnExec(
     override val output: Seq[Attribute],
     column: Attribute,
-    isExtended: Boolean) extends LeafV2CommandExec {
+    isExtended: Boolean,
+    colStats: Option[ColumnStatistics] = None) extends LeafV2CommandExec {
 
   override protected def run(): Seq[InternalRow] = {
     val rows = new ArrayBuffer[InternalRow]()
@@ -42,7 +44,43 @@ case class DescribeColumnExec(
       
CharVarcharUtils.getRawType(column.metadata).getOrElse(column.dataType).catalogString)
     rows += toCatalystRow("comment", comment)
 
-    // TODO: The extended description (isExtended = true) can be added here.
+    if (isExtended && colStats.nonEmpty) {
+      if (colStats.get.min().isPresent) {
+        rows += toCatalystRow("min", colStats.get.min().toString)
+      } else {
+        rows += toCatalystRow("min", "NULL")
+      }
+
+      if (colStats.get.max().isPresent) {
+        rows += toCatalystRow("max", colStats.get.max().toString)
+      } else {
+        rows += toCatalystRow("max", "NULL")
+      }
+
+      if (colStats.get.nullCount().isPresent) {
+        rows += toCatalystRow("num_nulls", 
colStats.get.nullCount().getAsLong.toString)
+      } else {
+        rows += toCatalystRow("num_nulls", "NULL")
+      }
+
+      if (colStats.get.distinctCount().isPresent) {
+        rows += toCatalystRow("distinct_count", 
colStats.get.distinctCount().getAsLong.toString)
+      } else {
+        rows += toCatalystRow("distinct_count", "NULL")
+      }
+
+      if (colStats.get.avgLen().isPresent) {
+        rows += toCatalystRow("avg_col_len", 
colStats.get.avgLen().getAsLong.toString)
+      } else {
+        rows += toCatalystRow("avg_col_len", "NULL")
+      }
+
+      if (colStats.get.maxLen().isPresent) {
+        rows += toCatalystRow("max_col_len", 
colStats.get.maxLen().getAsLong.toString)
+      } else {
+        rows += toCatalystRow("max_col_len", "NULL")
+      }
+    }
 
     rows.toSeq
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala
index 334521a96e5..25363dcea69 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala
@@ -149,13 +149,14 @@ class DescribeTableSuite extends 
command.DescribeTableSuiteBase
     }
   }
 
-  // TODO(SPARK-39859): Support v2 `DESCRIBE TABLE EXTENDED` for columns
   test("describe extended (formatted) a column") {
     withNamespaceAndTable("ns", "tbl") { tbl =>
       sql(s"""
         |CREATE TABLE $tbl
         |(key INT COMMENT 'column_comment', col STRING)
         |$defaultUsing""".stripMargin)
+
+      sql(s"INSERT INTO $tbl values (1, 'aaa'), (2, 'bbb'), (3, 'ccc'), (null, 
'ddd')")
       val descriptionDf = sql(s"DESCRIBE TABLE EXTENDED $tbl key")
       assert(descriptionDf.schema.map(field => (field.name, field.dataType)) 
=== Seq(
         ("info_name", StringType),
@@ -165,7 +166,13 @@ class DescribeTableSuite extends 
command.DescribeTableSuiteBase
         Seq(
           Row("col_name", "key"),
           Row("data_type", "int"),
-          Row("comment", "column_comment")))
+          Row("comment", "column_comment"),
+          Row("min", "NULL"),
+          Row("max", "NULL"),
+          Row("num_nulls", "1"),
+          Row("distinct_count", "4"),
+          Row("avg_col_len", "NULL"),
+          Row("max_col_len", "NULL")))
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to