beliefer commented on code in PR #35041:
URL: https://github.com/apache/spark/pull/35041#discussion_r848351608


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala:
##########
@@ -325,3 +339,136 @@ case class Percentile(
     frequencyExpression = newThird
   )
 }
+
+/**
+ * Return a percentile value based on a continuous distribution of
+ * numeric or ansi interval column at the given percentage (specified in ORDER 
BY clause).
+ * The value of percentage must be between 0.0 and 1.0.
+ */
+case class PercentileCont(left: Expression, right: Expression)
+  extends AggregateFunction
+  with RuntimeReplaceableAggregate
+  with ImplicitCastInputTypes
+  with BinaryLike[Expression] {
+  private lazy val percentile = new Percentile(left, right)
+  override def replacement: Expression = percentile
+  override def nodeName: String = "percentile_cont"
+  override def inputTypes: Seq[AbstractDataType] = percentile.inputTypes
+  override protected def withNewChildrenInternal(
+      newLeft: Expression, newRight: Expression): PercentileCont =
+    this.copy(left = newLeft, right = newRight)
+}
+
+/**
+ * The Percentile aggregate function returns the percentile(s) based on a 
discrete distribution of
+ * numeric column `expr` at the given percentage(s) with value range in [0.0, 
1.0].
+ *
+ * Because the number of elements and their partial order cannot be determined 
in advance.
+ * Therefore we have to store all the elements in memory, and so notice that 
too many elements can
+ * cause GC paused and eventually OutOfMemory Errors.
+ */
+private[sql] case class PercentileDisc private(
+    child: Expression,
+    percentageExpression: Expression,
+    frequencyExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends PercentileBase {
+
+  def this(child: Expression, percentageExpression: Expression) = {
+    this(child, percentageExpression, Literal(1L))
+  }
+
+  def this(child: Expression, percentageExpression: Expression, reverse: 
Boolean) = {
+    this(child, percentageExpression, Literal(1L), reverse)
+  }
+
+  private lazy val frequency = frequencyExpression.eval()
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val defaultCheck = super.checkInputDataTypes()
+    if (defaultCheck.isFailure) {
+      defaultCheck
+    } else if (!frequencyExpression.foldable) {
+      // percentageExpression must be foldable
+      TypeCheckFailure("The frequency of PercentileDisc must be a constant 
literal, " +
+        s"but got $frequencyExpression")
+    } else if (frequency != 1) {
+      TypeCheckFailure("Frequency value must be 1 for PercentileDisc")
+    } else {
+      TypeCheckSuccess
+    }
+  }
+
+  override def prettyName: String = "percentile_disc"
+
+  override protected def getPercentiles(sortedCounts: Seq[(AnyRef, Long)]): 
Seq[Double] = {

Review Comment:
   Yeah. The algorithm is different. I need to refactor it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to