Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19479#discussion_r147672869 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -89,19 +93,159 @@ case class AnalyzeColumnCommand( // The first element in the result will be the overall row count, the following elements // will be structs containing all column stats. // The layout of each struct follows the layout of the ColumnStats. - val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError val expressions = Count(Literal(1)).toAggregateExpression() +: - attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr)) + attributesToAnalyze.map(statExprs(_, sparkSession.sessionState.conf)) val namedExpressions = expressions.map(e => Alias(e, e.toString)()) val statsRow = new QueryExecution(sparkSession, Aggregate(Nil, namedExpressions, relation)) .executedPlan.executeTake(1).head val rowCount = statsRow.getLong(0) - val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr, i) => - // according to `ColumnStat.statExprs`, the stats struct always have 6 fields. - (attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1, 6), attr)) - }.toMap - (rowCount, columnStats) + val colStats = rowToColumnStats(sparkSession, relation, attributesToAnalyze, statsRow, rowCount) + (rowCount, colStats) + } + + /** + * Constructs an expression to compute column statistics for a given column. + * + * The expression should create a single struct column with the following schema: + * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long, + * percentiles: Array[T] + * + * Together with [[rowToColumnStats]], this function is used to create [[ColumnStat]] and + * as a result should stay in sync with it. + */ + private def statExprs(col: Attribute, conf: SQLConf): CreateNamedStruct = { + def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr => + expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() } + }) + val one = Literal(1, LongType) + + // the approximate ndv (num distinct value) should never be larger than the number of rows + val numNonNulls = if (col.nullable) Count(col) else Count(one) + val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls)) + val numNulls = Subtract(Count(one), numNonNulls) + val defaultSize = Literal(col.dataType.defaultSize, LongType) + val nullArray = Literal(null, ArrayType(DoubleType)) + + def fixedLenTypeExprs(castType: DataType) = { + // For fixed width types, avg size should be the same as max size. + Seq(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, defaultSize, + defaultSize) + } + + def fixedLenTypeStruct(castType: DataType, genHistogram: Boolean) = { + val percentileExpr = if (genHistogram) { + // To generate equi-height histogram, we need to: + // 1. get percentiles p(1/n), p(2/n) ... p((n-1)/n), + // 2. use min, max, and percentiles as range values of buckets, e.g. [min, p(1/n)], + // [p(1/n), p(2/n)] ... [p((n-1)/n), max], and then count ndv in each bucket. + // Step 2 will be performed in `rowToColumnStats`. --- End diff -- This is hacky, can we explicitly calculate percentiles at the beginning?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org