This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ebab0ef7c85 [SPARK-39859][SQL] Support v2 DESCRIBE TABLE EXTENDED for columns ebab0ef7c85 is described below commit ebab0ef7c8572e1dac41474c5991f482dbe9d253 Author: huaxingao <huaxin_...@apple.com> AuthorDate: Thu Feb 16 20:40:58 2023 -0800 [SPARK-39859][SQL] Support v2 DESCRIBE TABLE EXTENDED for columns ### What changes were proposed in this pull request? Support v2 DESCRIBE TABLE EXTENDED for columns ### Why are the changes needed? DS v1/v2 command parity ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #40058 from huaxingao/describe_col. Authored-by: huaxingao <huaxin_...@apple.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../datasources/v2/DataSourceV2Strategy.scala | 13 +++++-- .../datasources/v2/DescribeColumnExec.scala | 42 ++++++++++++++++++++-- .../execution/command/v2/DescribeTableSuite.scala | 11 ++++-- 3 files changed, 59 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 29f0da1158f..757b66e1534 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate} -import org.apache.spark.sql.connector.read.LocalScan +import org.apache.spark.sql.connector.read.{LocalScan, SupportsReportStatistics} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.connector.write.V1Write import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -329,10 +329,17 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } DescribeTableExec(output, r.table, isExtended) :: Nil - case DescribeColumn(_: ResolvedTable, column, isExtended, output) => + case DescribeColumn(r: ResolvedTable, column, isExtended, output) => column match { case c: Attribute => - DescribeColumnExec(output, c, isExtended) :: Nil + val colStats = + r.table.asReadable.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match { + case s: SupportsReportStatistics => + val stats = s.estimateStatistics() + Some(stats.columnStats().get(FieldReference.column(c.name))) + case _ => None + } + DescribeColumnExec(output, c, isExtended, colStats) :: Nil case nested => throw QueryCompilationErrors.commandNotSupportNestedColumnError( "DESC TABLE COLUMN", toPrettySQL(nested)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala index 3be9b5c5471..491c214080a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala @@ -22,11 +22,13 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics case class DescribeColumnExec( override val output: Seq[Attribute], column: Attribute, - isExtended: Boolean) extends LeafV2CommandExec { + isExtended: Boolean, + colStats: Option[ColumnStatistics] = None) extends LeafV2CommandExec { override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() @@ -42,7 +44,43 @@ case class DescribeColumnExec( CharVarcharUtils.getRawType(column.metadata).getOrElse(column.dataType).catalogString) rows += toCatalystRow("comment", comment) - // TODO: The extended description (isExtended = true) can be added here. + if (isExtended && colStats.nonEmpty) { + if (colStats.get.min().isPresent) { + rows += toCatalystRow("min", colStats.get.min().toString) + } else { + rows += toCatalystRow("min", "NULL") + } + + if (colStats.get.max().isPresent) { + rows += toCatalystRow("max", colStats.get.max().toString) + } else { + rows += toCatalystRow("max", "NULL") + } + + if (colStats.get.nullCount().isPresent) { + rows += toCatalystRow("num_nulls", colStats.get.nullCount().getAsLong.toString) + } else { + rows += toCatalystRow("num_nulls", "NULL") + } + + if (colStats.get.distinctCount().isPresent) { + rows += toCatalystRow("distinct_count", colStats.get.distinctCount().getAsLong.toString) + } else { + rows += toCatalystRow("distinct_count", "NULL") + } + + if (colStats.get.avgLen().isPresent) { + rows += toCatalystRow("avg_col_len", colStats.get.avgLen().getAsLong.toString) + } else { + rows += toCatalystRow("avg_col_len", "NULL") + } + + if (colStats.get.maxLen().isPresent) { + rows += toCatalystRow("max_col_len", colStats.get.maxLen().getAsLong.toString) + } else { + rows += toCatalystRow("max_col_len", "NULL") + } + } rows.toSeq } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index 334521a96e5..25363dcea69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -149,13 +149,14 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase } } - // TODO(SPARK-39859): Support v2 `DESCRIBE TABLE EXTENDED` for columns test("describe extended (formatted) a column") { withNamespaceAndTable("ns", "tbl") { tbl => sql(s""" |CREATE TABLE $tbl |(key INT COMMENT 'column_comment', col STRING) |$defaultUsing""".stripMargin) + + sql(s"INSERT INTO $tbl values (1, 'aaa'), (2, 'bbb'), (3, 'ccc'), (null, 'ddd')") val descriptionDf = sql(s"DESCRIBE TABLE EXTENDED $tbl key") assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( ("info_name", StringType), @@ -165,7 +166,13 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase Seq( Row("col_name", "key"), Row("data_type", "int"), - Row("comment", "column_comment"))) + Row("comment", "column_comment"), + Row("min", "NULL"), + Row("max", "NULL"), + Row("num_nulls", "1"), + Row("distinct_count", "4"), + Row("avg_col_len", "NULL"), + Row("max_col_len", "NULL"))) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org