beliefer commented on code in PR #35041: URL: https://github.com/apache/spark/pull/35041#discussion_r851572853
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala: ########## @@ -325,3 +339,133 @@ case class Percentile( frequencyExpression = newThird ) } + +/** + * Return a percentile value based on a continuous distribution of + * numeric or ansi interval column at the given percentage (specified in ORDER BY clause). + * The value of percentage must be between 0.0 and 1.0. + */ +case class PercentileCont(left: Expression, right: Expression) + extends AggregateFunction + with RuntimeReplaceableAggregate + with ImplicitCastInputTypes + with BinaryLike[Expression] { + private lazy val percentile = new Percentile(left, right) + override def replacement: Expression = percentile + override def nodeName: String = "percentile_cont" + override def inputTypes: Seq[AbstractDataType] = percentile.inputTypes + override protected def withNewChildrenInternal( + newLeft: Expression, newRight: Expression): PercentileCont = + this.copy(left = newLeft, right = newRight) +} + +/** + * The Percentile aggregate function returns the percentile(s) based on a discrete distribution of + * numeric column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * Because the number of elements and their partial order cannot be determined in advance. + * Therefore we have to store all the elements in memory, and so notice that too many elements can + * cause GC paused and eventually OutOfMemory Errors. + */ +case class PercentileDisc private( + child: Expression, + percentageExpression: Expression, + frequencyExpression: Expression, + reverse: Boolean = false, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends PercentileBase { + + def this(child: Expression, percentageExpression: Expression) = { + this(child, percentageExpression, Literal(1L)) + } + + def this(child: Expression, percentageExpression: Expression, reverse: Boolean) = { + this(child, percentageExpression, Literal(1L), reverse) + } + + override def checkInputDataTypes(): TypeCheckResult = { + val defaultCheck = super.checkInputDataTypes() + if (defaultCheck.isFailure) { + defaultCheck + } else { + assert(frequencyExpression == Literal(1L)) Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org