Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19479#discussion_r147672869
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
    @@ -89,19 +93,159 @@ case class AnalyzeColumnCommand(
         // The first element in the result will be the overall row count, the 
following elements
         // will be structs containing all column stats.
         // The layout of each struct follows the layout of the ColumnStats.
    -    val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
         val expressions = Count(Literal(1)).toAggregateExpression() +:
    -      attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
    +      attributesToAnalyze.map(statExprs(_, sparkSession.sessionState.conf))
     
         val namedExpressions = expressions.map(e => Alias(e, e.toString)())
         val statsRow = new QueryExecution(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
           .executedPlan.executeTake(1).head
     
         val rowCount = statsRow.getLong(0)
    -    val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr, 
i) =>
    -      // according to `ColumnStat.statExprs`, the stats struct always have 
6 fields.
    -      (attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1, 6), 
attr))
    -    }.toMap
    -    (rowCount, columnStats)
    +    val colStats = rowToColumnStats(sparkSession, relation, 
attributesToAnalyze, statsRow, rowCount)
    +    (rowCount, colStats)
    +  }
    +
    +  /**
    +   * Constructs an expression to compute column statistics for a given 
column.
    +   *
    +   * The expression should create a single struct column with the 
following schema:
    +   * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, 
maxLen: Long,
    +   * percentiles: Array[T]
    +   *
    +   * Together with [[rowToColumnStats]], this function is used to create 
[[ColumnStat]] and
    +   * as a result should stay in sync with it.
    +   */
    +  private def statExprs(col: Attribute, conf: SQLConf): CreateNamedStruct 
= {
    +    def struct(exprs: Expression*): CreateNamedStruct = 
CreateStruct(exprs.map { expr =>
    +      expr.transformUp { case af: AggregateFunction => 
af.toAggregateExpression() }
    +    })
    +    val one = Literal(1, LongType)
    +
    +    // the approximate ndv (num distinct value) should never be larger 
than the number of rows
    +    val numNonNulls = if (col.nullable) Count(col) else Count(one)
    +    val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), 
numNonNulls))
    +    val numNulls = Subtract(Count(one), numNonNulls)
    +    val defaultSize = Literal(col.dataType.defaultSize, LongType)
    +    val nullArray = Literal(null, ArrayType(DoubleType))
    +
    +    def fixedLenTypeExprs(castType: DataType) = {
    +      // For fixed width types, avg size should be the same as max size.
    +      Seq(ndv, Cast(Min(col), castType), Cast(Max(col), castType), 
numNulls, defaultSize,
    +        defaultSize)
    +    }
    +
    +    def fixedLenTypeStruct(castType: DataType, genHistogram: Boolean) = {
    +      val percentileExpr = if (genHistogram) {
    +        // To generate equi-height histogram, we need to:
    +        // 1. get percentiles p(1/n), p(2/n) ... p((n-1)/n),
    +        // 2. use min, max, and percentiles as range values of buckets, 
e.g. [min, p(1/n)],
    +        // [p(1/n), p(2/n)] ... [p((n-1)/n), max], and then count ndv in 
each bucket.
    +        // Step 2 will be performed in `rowToColumnStats`.
    --- End diff --
    
    This is hacky, can we explicitly calculate percentiles at the beginning?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to