beliefer commented on code in PR #35041: URL: https://github.com/apache/spark/pull/35041#discussion_r848351608
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala: ########## @@ -325,3 +339,136 @@ case class Percentile( frequencyExpression = newThird ) } + +/** + * Return a percentile value based on a continuous distribution of + * numeric or ansi interval column at the given percentage (specified in ORDER BY clause). + * The value of percentage must be between 0.0 and 1.0. + */ +case class PercentileCont(left: Expression, right: Expression) + extends AggregateFunction + with RuntimeReplaceableAggregate + with ImplicitCastInputTypes + with BinaryLike[Expression] { + private lazy val percentile = new Percentile(left, right) + override def replacement: Expression = percentile + override def nodeName: String = "percentile_cont" + override def inputTypes: Seq[AbstractDataType] = percentile.inputTypes + override protected def withNewChildrenInternal( + newLeft: Expression, newRight: Expression): PercentileCont = + this.copy(left = newLeft, right = newRight) +} + +/** + * The Percentile aggregate function returns the percentile(s) based on a discrete distribution of + * numeric column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * Because the number of elements and their partial order cannot be determined in advance. + * Therefore we have to store all the elements in memory, and so notice that too many elements can + * cause GC paused and eventually OutOfMemory Errors. + */ +private[sql] case class PercentileDisc private( + child: Expression, + percentageExpression: Expression, + frequencyExpression: Expression, + reverse: Boolean = false, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends PercentileBase { + + def this(child: Expression, percentageExpression: Expression) = { + this(child, percentageExpression, Literal(1L)) + } + + def this(child: Expression, percentageExpression: Expression, reverse: Boolean) = { + this(child, percentageExpression, Literal(1L), reverse) + } + + private lazy val frequency = frequencyExpression.eval() + + override def checkInputDataTypes(): TypeCheckResult = { + val defaultCheck = super.checkInputDataTypes() + if (defaultCheck.isFailure) { + defaultCheck + } else if (!frequencyExpression.foldable) { + // percentageExpression must be foldable + TypeCheckFailure("The frequency of PercentileDisc must be a constant literal, " + + s"but got $frequencyExpression") + } else if (frequency != 1) { + TypeCheckFailure("Frequency value must be 1 for PercentileDisc") + } else { + TypeCheckSuccess + } + } + + override def prettyName: String = "percentile_disc" + + override protected def getPercentiles(sortedCounts: Seq[(AnyRef, Long)]): Seq[Double] = { Review Comment: Yeah. The algorithm is different. I need to refactor it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org