This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 639301a933f [SPARK-37691][SQL] Support ANSI Aggregation Function: 
`percentile_disc`
639301a933f is described below

commit 639301a933f3f7b0a4cc2c1defb6c843afae180e
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Tue Apr 19 20:56:56 2022 +0800

    [SPARK-37691][SQL] Support ANSI Aggregation Function: `percentile_disc`
    
    ### What changes were proposed in this pull request?
    `PERCENTILE_DISC` is an ANSI aggregate functions.
    
    The mainstream database supports `percentile_disc` show below:
    **Postgresql**
    https://www.postgresql.org/docs/9.4/functions-aggregate.html
    **Teradata**
    https://docs.teradata.com/r/kmuOwjp1zEYg98JsB8fu_A/cPkFySIBORL~M938Zv07Cg
    **Snowflake**
    https://docs.snowflake.com/en/sql-reference/functions/percentile_disc.html
    **Oracle**
    
https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/PERCENTILE_DISC.html#GUID-7C34FDDA-C241-474F-8C5C-50CC0182E005
    **DB2**
    https://www.ibm.com/docs/en/db2/11.5?topic=functions-percentile-disc
    **H2**
    http://www.h2database.com/html/functions-aggregate.html#percentile_disc
    **Sybase**
    
https://infocenter.sybase.com/help/index.jsp?topic=/com.sybase.infocenter.dc01776.1601/doc/html/san1278453110413.html
    **Exasol**
    
https://docs.exasol.com/sql_references/functions/alphabeticallistfunctions/percentile_disc.htm
    **RedShift**
    
https://docs.aws.amazon.com/redshift/latest/dg/r_APPROXIMATE_PERCENTILE_DISC.html
    **Yellowbrick**
    https://www.yellowbrick.com/docs/2.2/ybd_sqlref/percentile_disc.html
    **Mariadb**
    https://mariadb.com/kb/en/percentile_disc/
    **Phoenix**
    http://phoenix.incubator.apache.org/language/functions.html#percentile_disc
    **Singlestore**
    
https://docs.singlestore.com/db/v7.6/en/reference/sql-reference/window-functions/percentile_disc.html
    
    This PR references the implementation of H2. Please refer: 
https://github.com/h2database/h2database/blob/master/h2/src/main/org/h2/expression/aggregate/Percentile.java
    
    ### Why are the changes needed?
    `PERCENTILE_DISC` is very useful. Exposing the expression can make the 
migration from other systems to Spark SQL easier.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'. New feature.
    
    ### How was this patch tested?
    New tests.
    
    Closes #35041 from beliefer/SPARK-37691.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 docs/sql-ref-ansi-compliance.md                    |   1 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |   1 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   3 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala      |   7 +-
 .../expressions/aggregate/PercentileCont.scala     |  41 ----
 .../{Percentile.scala => percentiles.scala}        | 260 ++++++++++++++-------
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  18 +-
 .../expressions/aggregate/PercentileSuite.scala    |  13 +-
 .../sql/catalyst/parser/PlanParserSuite.scala      |  24 +-
 .../test/resources/sql-tests/inputs/group-by.sql   |  17 +-
 .../inputs/postgreSQL/aggregates_part4.sql         |   8 +-
 .../inputs/udf/postgreSQL/udf-aggregates_part4.sql |   8 +-
 .../src/test/resources/sql-tests/inputs/window.sql |  59 ++++-
 .../resources/sql-tests/results/group-by.sql.out   |  39 +++-
 .../results/postgreSQL/aggregates_part4.sql.out    |  31 ++-
 .../udf/postgreSQL/udf-aggregates_part4.sql.out    |  31 ++-
 .../resources/sql-tests/results/window.sql.out     | 184 +++++++++++----
 17 files changed, 540 insertions(+), 205 deletions(-)

diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 66161a112b1..89ba2d17608 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -512,6 +512,7 @@ Below is a list of all the keywords in Spark SQL.
 |PARTITIONS|non-reserved|non-reserved|non-reserved|
 |PERCENT|non-reserved|non-reserved|non-reserved|
 |PERCENTILE_CONT|reserved|non-reserved|non-reserved|
+|PERCENTILE_DISC|reserved|non-reserved|non-reserved|
 |PIVOT|non-reserved|non-reserved|non-reserved|
 |PLACING|non-reserved|non-reserved|non-reserved|
 |POSITION|non-reserved|non-reserved|reserved|
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index e84d4fa45eb..c5199a601ce 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -259,6 +259,7 @@ PARTITION: 'PARTITION';
 PARTITIONED: 'PARTITIONED';
 PARTITIONS: 'PARTITIONS';
 PERCENTILE_CONT: 'PERCENTILE_CONT';
+PERCENTILE_DISC: 'PERCENTILE_DISC';
 PERCENTLIT: 'PERCENT';
 PIVOT: 'PIVOT';
 PLACING: 'PLACING';
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 872ea534ec3..6d465469f41 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -845,7 +845,7 @@ primaryExpression
        FROM srcStr=valueExpression RIGHT_PAREN                                 
                #trim
     | OVERLAY LEFT_PAREN input=valueExpression PLACING replace=valueExpression
       FROM position=valueExpression (FOR length=valueExpression)? RIGHT_PAREN  
                #overlay
-    | PERCENTILE_CONT LEFT_PAREN percentage=valueExpression RIGHT_PAREN
+    | name=(PERCENTILE_CONT | PERCENTILE_DISC) LEFT_PAREN 
percentage=valueExpression RIGHT_PAREN
       WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN ( OVER 
windowSpec)?                #percentile
     ;
 
@@ -1466,6 +1466,7 @@ nonReserved
     | PARTITIONED
     | PARTITIONS
     | PERCENTILE_CONT
+    | PERCENTILE_DISC
     | PERCENTLIT
     | PIVOT
     | PLACING
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 936dadc78c1..e9cf758ede2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
PercentileCont}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
PercentileCont, PercentileDisc}
 import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, 
DecorrelateInnerQuery, InlineCTE}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -228,10 +228,11 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
             // Only allow window functions with an aggregate expression or an 
offset window
             // function or a Pandas window UDF.
             w.windowFunction match {
-              case AggregateExpression(_: PercentileCont, _, _, _, _)
+              case agg @ AggregateExpression(_: PercentileCont | _: 
PercentileDisc, _, _, _, _)
                 if w.windowSpec.orderSpec.nonEmpty || 
w.windowSpec.frameSpecification !=
                     SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
UnboundedFollowing) =>
-                failAnalysis("Cannot specify order by or frame for 
'PERCENTILE_CONT'.")
+                failAnalysis(
+                  s"Cannot specify order by or frame for 
'${agg.aggregateFunction.prettyName}'.")
               case _: AggregateExpression | _: FrameLessOffsetWindowFunction |
                   _: AggregateWindowFunction => // OK
               case f: PythonUDF if PythonUDF.isWindowPandasUDF(f) => // OK
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileCont.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileCont.scala
deleted file mode 100644
index facb269ffe7..00000000000
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileCont.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions.aggregate
-
-import org.apache.spark.sql.catalyst.expressions.{Expression, 
ImplicitCastInputTypes, RuntimeReplaceableAggregate}
-import org.apache.spark.sql.catalyst.trees.BinaryLike
-import org.apache.spark.sql.types.AbstractDataType
-
-/**
- * Return a percentile value based on a continuous distribution of
- * numeric or ansi interval column at the given percentage (specified in ORDER 
BY clause).
- * The value of percentage must be between 0.0 and 1.0.
- */
-case class PercentileCont(left: Expression, right: Expression)
-  extends AggregateFunction
-  with RuntimeReplaceableAggregate
-  with ImplicitCastInputTypes
-  with BinaryLike[Expression] {
-  private lazy val percentile = new Percentile(left, right)
-  override def replacement: Expression = percentile
-  override def nodeName: String = "percentile_cont"
-  override def inputTypes: Seq[AbstractDataType] = percentile.inputTypes
-  override protected def withNewChildrenInternal(
-      newLeft: Expression, newRight: Expression): PercentileCont =
-    this.copy(left = newLeft, right = newRight)
-}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
similarity index 70%
rename from 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
rename to 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
index 91bd7aa08fc..d55f0b7a831 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
@@ -24,93 +24,36 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.trees.TernaryLike
+import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types._
 import org.apache.spark.util.collection.OpenHashMap
 
-/**
- * The Percentile aggregate function returns the exact percentile(s) of 
numeric column `expr` at
- * the given percentage(s) with value range in [0.0, 1.0].
- *
- * Because the number of elements and their partial order cannot be determined 
in advance.
- * Therefore we have to store all the elements in memory, and so notice that 
too many elements can
- * cause GC paused and eventually OutOfMemory Errors.
- *
- * @param child child expression that produce numeric column value with 
`child.eval(inputRow)`
- * @param percentageExpression Expression that represents a single percentage 
value or an array of
- *                             percentage values. Each percentage value must 
be in the range
- *                             [0.0, 1.0].
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage =
-    """
-      _FUNC_(col, percentage [, frequency]) - Returns the exact percentile 
value of numeric
-       or ansi interval column `col` at the given percentage. The value of 
percentage must be
-       between 0.0 and 1.0. The value of frequency should be positive integral
+abstract class PercentileBase extends 
TypedImperativeAggregate[OpenHashMap[AnyRef, Long]]
+  with ImplicitCastInputTypes {
 
-      _FUNC_(col, array(percentage1 [, percentage2]...) [, frequency]) - 
Returns the exact
-      percentile value array of numeric column `col` at the given 
percentage(s). Each value
-      of the percentage array must be between 0.0 and 1.0. The value of 
frequency should be
-      positive integral
+  val child: Expression
+  val percentageExpression: Expression
+  val frequencyExpression : Expression
 
-      """,
-  examples = """
-    Examples:
-      > SELECT _FUNC_(col, 0.3) FROM VALUES (0), (10) AS tab(col);
-       3.0
-      > SELECT _FUNC_(col, array(0.25, 0.75)) FROM VALUES (0), (10) AS 
tab(col);
-       [2.5,7.5]
-      > SELECT _FUNC_(col, 0.5) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL 
'10' MONTH) AS tab(col);
-       5.0
-      > SELECT _FUNC_(col, array(0.2, 0.5)) FROM VALUES (INTERVAL '0' SECOND), 
(INTERVAL '10' SECOND) AS tab(col);
-       [2000000.0,5000000.0]
-  """,
-  group = "agg_funcs",
-  since = "2.1.0")
-// scalastyle:on line.size.limit
-case class Percentile(
-    child: Expression,
-    percentageExpression: Expression,
-    frequencyExpression : Expression,
-    mutableAggBufferOffset: Int = 0,
-    inputAggBufferOffset: Int = 0)
-  extends TypedImperativeAggregate[OpenHashMap[AnyRef, Long]] with 
ImplicitCastInputTypes
-  with TernaryLike[Expression] {
+  // Whether to reverse calculate percentile value
+  val reverse: Boolean
 
-  def this(child: Expression, percentageExpression: Expression) = {
-    this(child, percentageExpression, Literal(1L), 0, 0)
-  }
-
-  def this(child: Expression, percentageExpression: Expression, frequency: 
Expression) = {
-    this(child, percentageExpression, frequency, 0, 0)
-  }
-
-  override def prettyName: String = "percentile"
-
-  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): 
Percentile =
-    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
-
-  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
Percentile =
-    copy(inputAggBufferOffset = newInputAggBufferOffset)
+  // Whether the value is discrete
+  protected def discrete: Boolean
 
   // Mark as lazy so that percentageExpression is not evaluated during tree 
transformation.
   @transient
   private lazy val returnPercentileArray = 
percentageExpression.dataType.isInstanceOf[ArrayType]
 
   @transient
-  private lazy val percentages = percentageExpression.eval() match {
+  protected lazy val percentages = percentageExpression.eval() match {
     case null => null
     case num: Double => Array(num)
     case arrayData: ArrayData => arrayData.toDoubleArray()
   }
 
-  override def first: Expression = child
-  override def second: Expression = percentageExpression
-  override def third: Expression = frequencyExpression
-
   // Returns null for empty inputs
   override def nullable: Boolean = true
 
@@ -151,7 +94,7 @@ case class Percentile(
     }
   }
 
-  private def toDoubleValue(d: Any): Double = d match {
+  protected def toDoubleValue(d: Any): Double = d match {
     case d: Decimal => d.toDouble
     case n: Number => n.doubleValue
   }
@@ -204,7 +147,11 @@ case class Percentile(
       case intervalType: DayTimeIntervalType => intervalType.ordering
       case otherType => QueryExecutionErrors.unsupportedTypeError(otherType)
     }
-    val sortedCounts = 
buffer.toSeq.sortBy(_._1)(ordering.asInstanceOf[Ordering[AnyRef]])
+    val sortedCounts = if (reverse) {
+      
buffer.toSeq.sortBy(_._1)(ordering.asInstanceOf[Ordering[AnyRef]].reverse)
+    } else {
+      buffer.toSeq.sortBy(_._1)(ordering.asInstanceOf[Ordering[AnyRef]])
+    }
     val accumulatedCounts = sortedCounts.scanLeft((sortedCounts.head._1, 0L)) {
       case ((key1, count1), (key2, count2)) => (key2, count1 + count2)
     }.tail
@@ -227,40 +174,44 @@ case class Percentile(
 
   /**
    * Get the percentile value.
-   *
    * This function has been based upon similar function from HIVE
    * `org.apache.hadoop.hive.ql.udf.UDAFPercentile.getPercentile()`.
    */
-  private def getPercentile(aggreCounts: Seq[(AnyRef, Long)], position: 
Double): Double = {
+  private def getPercentile(
+      accumulatedCounts: Seq[(AnyRef, Long)], position: Double): Double = {
     // We may need to do linear interpolation to get the exact percentile
     val lower = position.floor.toLong
     val higher = position.ceil.toLong
 
     // Use binary search to find the lower and the higher position.
-    val countsArray = aggreCounts.map(_._2).toArray[Long]
-    val lowerIndex = binarySearchCount(countsArray, 0, aggreCounts.size, lower 
+ 1)
-    val higherIndex = binarySearchCount(countsArray, 0, aggreCounts.size, 
higher + 1)
+    val countsArray = accumulatedCounts.map(_._2).toArray[Long]
+    val lowerIndex = binarySearchCount(countsArray, 0, accumulatedCounts.size, 
lower + 1)
+    val higherIndex = binarySearchCount(countsArray, 0, 
accumulatedCounts.size, higher + 1)
 
-    val lowerKey = aggreCounts(lowerIndex)._1
+    val lowerKey = accumulatedCounts(lowerIndex)._1
     if (higher == lower) {
       // no interpolation needed because position does not have a fraction
       return toDoubleValue(lowerKey)
     }
 
-    val higherKey = aggreCounts(higherIndex)._1
+    val higherKey = accumulatedCounts(higherIndex)._1
     if (higherKey == lowerKey) {
       // no interpolation needed because lower position and higher position 
has the same key
       return toDoubleValue(lowerKey)
     }
 
-    // Linear interpolation to get the exact percentile
-    (higher - position) * toDoubleValue(lowerKey) + (position - lower) * 
toDoubleValue(higherKey)
+    if (discrete) {
+      toDoubleValue(lowerKey)
+    } else {
+      // Linear interpolation to get the exact percentile
+      (higher - position) * toDoubleValue(lowerKey) + (position - lower) * 
toDoubleValue(higherKey)
+    }
   }
 
   /**
    * use a binary search to find the index of the position closest to the 
current value.
    */
-  private def binarySearchCount(
+  protected def binarySearchCount(
       countsArray: Array[Long], start: Int, end: Int, value: Long): Int = {
     util.Arrays.binarySearch(countsArray, 0, end, value) match {
       case ix if ix < 0 => -(ix + 1)
@@ -317,6 +268,88 @@ case class Percentile(
       bis.close()
     }
   }
+}
+
+/**
+ * The Percentile aggregate function returns the exact percentile(s) of 
numeric column `expr` at
+ * the given percentage(s) with value range in [0.0, 1.0].
+ *
+ * Because the number of elements and their partial order cannot be determined 
in advance.
+ * Therefore we have to store all the elements in memory, and so notice that 
too many elements can
+ * cause GC paused and eventually OutOfMemory Errors.
+ *
+ * @param child child expression that produce numeric column value with 
`child.eval(inputRow)`
+ * @param percentageExpression Expression that represents a single percentage 
value or an array of
+ *                             percentage values. Each percentage value must 
be in the range
+ *                             [0.0, 1.0].
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage =
+    """
+      _FUNC_(col, percentage [, frequency]) - Returns the exact percentile 
value of numeric
+       or ansi interval column `col` at the given percentage. The value of 
percentage must be
+       between 0.0 and 1.0. The value of frequency should be positive integral
+
+      _FUNC_(col, array(percentage1 [, percentage2]...) [, frequency]) - 
Returns the exact
+      percentile value array of numeric column `col` at the given 
percentage(s). Each value
+      of the percentage array must be between 0.0 and 1.0. The value of 
frequency should be
+      positive integral
+
+      """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col, 0.3) FROM VALUES (0), (10) AS tab(col);
+       3.0
+      > SELECT _FUNC_(col, array(0.25, 0.75)) FROM VALUES (0), (10) AS 
tab(col);
+       [2.5,7.5]
+      > SELECT _FUNC_(col, 0.5) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL 
'10' MONTH) AS tab(col);
+       5.0
+      > SELECT _FUNC_(col, array(0.2, 0.5)) FROM VALUES (INTERVAL '0' SECOND), 
(INTERVAL '10' SECOND) AS tab(col);
+       [2000000.0,5000000.0]
+  """,
+  group = "agg_funcs",
+  since = "2.1.0")
+// scalastyle:on line.size.limit
+case class Percentile(
+    child: Expression,
+    percentageExpression: Expression,
+    frequencyExpression : Expression,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0,
+    reverse: Boolean = false) extends PercentileBase with 
TernaryLike[Expression] {
+
+  def this(child: Expression, percentageExpression: Expression) = {
+    this(child, percentageExpression, Literal(1L), 0, 0)
+  }
+
+  def this(child: Expression, percentageExpression: Expression, frequency: 
Expression) = {
+    this(child, percentageExpression, frequency, 0, 0)
+  }
+
+  def this(child: Expression, percentageExpression: Expression, reverse: 
Boolean) = {
+    this(child, percentageExpression, Literal(1L), reverse = reverse)
+  }
+
+  override def first: Expression = child
+  override def second: Expression = percentageExpression
+  override def third: Expression = frequencyExpression
+
+  override def prettyName: String = "percentile"
+
+  override def discrete: Boolean = false
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): 
Percentile =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
Percentile =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override protected def stringArgs: Iterator[Any] = if (discrete) {
+    super.stringArgs ++ Some(discrete)
+  } else {
+    super.stringArgs
+  }
 
   override protected def withNewChildrenInternal(
       newFirst: Expression, newSecond: Expression, newThird: Expression): 
Percentile = copy(
@@ -325,3 +358,70 @@ case class Percentile(
     frequencyExpression = newThird
   )
 }
+
+/**
+ * Return a percentile value based on a continuous distribution of
+ * numeric or ansi interval column at the given percentage (specified in ORDER 
BY clause).
+ * The value of percentage must be between 0.0 and 1.0.
+ */
+case class PercentileCont(left: Expression, right: Expression, reverse: 
Boolean = false)
+  extends AggregateFunction
+  with RuntimeReplaceableAggregate
+  with ImplicitCastInputTypes
+  with BinaryLike[Expression] {
+  private lazy val percentile = new Percentile(left, right, reverse)
+  override def replacement: Expression = percentile
+  override def nodeName: String = "percentile_cont"
+  override def inputTypes: Seq[AbstractDataType] = percentile.inputTypes
+  override def sql(isDistinct: Boolean): String = {
+    val distinct = if (isDistinct) "DISTINCT " else ""
+    val direction = if (reverse) " DESC" else ""
+    s"$prettyName($distinct${right.sql}) WITHIN GROUP (ORDER BY v$direction)"
+  }
+  override protected def withNewChildrenInternal(
+      newLeft: Expression, newRight: Expression): PercentileCont =
+    this.copy(left = newLeft, right = newRight)
+}
+
+/**
+ * The Percentile aggregate function returns the percentile(s) based on a 
discrete distribution of
+ * numeric column `expr` at the given percentage(s) with value range in [0.0, 
1.0].
+ *
+ * Because the number of elements and their partial order cannot be determined 
in advance.
+ * Therefore we have to store all the elements in memory, and so notice that 
too many elements can
+ * cause GC paused and eventually OutOfMemory Errors.
+ */
+case class PercentileDisc(
+    child: Expression,
+    percentageExpression: Expression,
+    reverse: Boolean = false,
+    mutableAggBufferOffset: Int = 0,
+    inputAggBufferOffset: Int = 0) extends PercentileBase with 
BinaryLike[Expression] {
+
+  val frequencyExpression: Expression = Literal(1L)
+
+  override def left: Expression = child
+  override def right: Expression = percentageExpression
+
+  override def prettyName: String = "percentile_disc"
+
+  override def discrete: Boolean = true
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): 
PercentileDisc =
+    copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
PercentileDisc =
+    copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def sql(isDistinct: Boolean): String = {
+    val distinct = if (isDistinct) "DISTINCT " else ""
+    val direction = if (reverse) " DESC" else ""
+    s"$prettyName($distinct${right.sql}) WITHIN GROUP (ORDER BY v$direction)"
+  }
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression, newRight: Expression): PercentileDisc = copy(
+    child = newLeft,
+    percentageExpression = newRight
+  )
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 83ac8054c3d..4ffb4f1cfe1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, 
SQLConfHelper, TableId
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last, 
PercentileCont}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last, 
PercentileCont, PercentileDisc}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -1841,11 +1841,19 @@ class AstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
   override def visitPercentile(ctx: PercentileContext): Expression = 
withOrigin(ctx) {
     val percentage = expression(ctx.percentage)
     val sortOrder = visitSortItem(ctx.sortItem)
-    val percentileCont = sortOrder.direction match {
-      case Ascending => PercentileCont(sortOrder.child, percentage)
-      case Descending => PercentileCont(sortOrder.child, Subtract(Literal(1), 
percentage))
+    val percentile = ctx.name.getType match {
+      case SqlBaseParser.PERCENTILE_CONT =>
+        sortOrder.direction match {
+          case Ascending => PercentileCont(sortOrder.child, percentage)
+          case Descending => PercentileCont(sortOrder.child, percentage, true)
+        }
+      case SqlBaseParser.PERCENTILE_DISC =>
+        sortOrder.direction match {
+          case Ascending => PercentileDisc(sortOrder.child, percentage)
+          case Descending => PercentileDisc(sortOrder.child, percentage, true)
+        }
     }
-    val aggregateExpression = percentileCont.toAggregateExpression()
+    val aggregateExpression = percentile.toAggregateExpression()
     ctx.windowSpec match {
       case spec: WindowRefContext =>
         UnresolvedWindowExpression(aggregateExpression, visitWindowRef(spec))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala
index b5882b1ab40..e08c6279fd6 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala
@@ -83,8 +83,8 @@ class PercentileSuite extends SparkFunSuite {
   }
 
   private def runTest(agg: Percentile,
-        rows : Seq[Seq[Any]],
-        expectedPercentiles : Seq[Double]): Unit = {
+      rows : Seq[Seq[Any]],
+      expectedPercentiles : Seq[Double]): Unit = {
     assert(agg.nullable)
     val group1 = (0 until rows.length / 2)
     val group1Buffer = agg.createAggregationBuffer()
@@ -218,7 +218,7 @@ class PercentileSuite extends SparkFunSuite {
       val percentile2 = new Percentile(child, percentage)
       assertEqual(percentile2.checkInputDataTypes(),
         TypeCheckFailure(s"Percentage(s) must be between 0.0 and 1.0, " +
-        s"but got ${percentage.simpleString(100)}"))
+          s"but got ${percentage.simpleString(100)}"))
     }
 
     val nonFoldablePercentage = Seq(NonFoldableLiteral(0.5),
@@ -270,7 +270,6 @@ class PercentileSuite extends SparkFunSuite {
   }
 
   test("nulls in percentage expression") {
-
     assert(new Percentile(
       AttributeReference("a", DoubleType)(),
       percentageExpression = Literal(null, DoubleType)).checkInputDataTypes() 
===
@@ -280,9 +279,9 @@ class PercentileSuite extends SparkFunSuite {
       Seq(CreateArray(Seq(null).map(Literal(_))), CreateArray(Seq(0.1D, 
null).map(Literal(_))))
 
     nullPercentageExprs.foreach { percentageExpression =>
-        val wrongPercentage = new Percentile(
-          AttributeReference("a", DoubleType)(),
-          percentageExpression = percentageExpression)
+      val wrongPercentage = new Percentile(
+        AttributeReference("a", DoubleType)(),
+        percentageExpression = percentageExpression)
         assert(
           wrongPercentage.checkInputDataTypes() match {
             case TypeCheckFailure(msg) if msg.contains("argument 2 requires 
array<double>") => true
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 3d0d2fea7be..f511938f7aa 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, 
RelationTimeTravel, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, 
UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedStar, 
UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.PercentileCont
+import org.apache.spark.sql.catalyst.expressions.aggregate.{PercentileCont, 
PercentileDisc}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.internal.SQLConf
@@ -1299,24 +1299,36 @@ class PlanParserSuite extends AnalysisTest {
       "timestamp expression cannot contain subqueries")
   }
 
-  test("PERCENTILE_CONT function") {
-    def assertPercentileContPlans(inputSQL: String, expectedExpression: 
Expression): Unit = {
+  test("PERCENTILE_CONT & PERCENTILE_DISC") {
+    def assertPercentilePlans(inputSQL: String, expectedExpression: 
Expression): Unit = {
       comparePlans(
         parsePlan(inputSQL),
         Project(Seq(UnresolvedAlias(expectedExpression)), OneRowRelation())
       )
     }
 
-    assertPercentileContPlans(
+    assertPercentilePlans(
       "SELECT PERCENTILE_CONT(0.1) WITHIN GROUP (ORDER BY col)",
       PercentileCont(UnresolvedAttribute("col"), Literal(Decimal(0.1), 
DecimalType(1, 1)))
         .toAggregateExpression()
     )
 
-    assertPercentileContPlans(
+    assertPercentilePlans(
       "SELECT PERCENTILE_CONT(0.1) WITHIN GROUP (ORDER BY col DESC)",
       PercentileCont(UnresolvedAttribute("col"),
-        Subtract(Literal(1), Literal(Decimal(0.1), DecimalType(1, 
1)))).toAggregateExpression()
+        Literal(Decimal(0.1), DecimalType(1, 1)), true).toAggregateExpression()
+    )
+
+    assertPercentilePlans(
+      "SELECT PERCENTILE_DISC(0.1) WITHIN GROUP (ORDER BY col)",
+      PercentileDisc(UnresolvedAttribute("col"), Literal(Decimal(0.1), 
DecimalType(1, 1)))
+        .toAggregateExpression()
+    )
+
+    assertPercentilePlans(
+      "SELECT PERCENTILE_DISC(0.1) WITHIN GROUP (ORDER BY col DESC)",
+      new PercentileDisc(UnresolvedAttribute("col"),
+        Literal(Decimal(0.1), DecimalType(1, 1)), true).toAggregateExpression()
     )
   }
 }
diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql 
b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
index 0fc9366a92a..dd10d7b5d1c 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
@@ -278,8 +278,8 @@ SELECT k, regr_sxx(y, x) FROM testRegression WHERE x IS NOT 
NULL AND y IS NOT NU
 
 -- SPARK-37676: Support ANSI Aggregation Function: percentile_cont
 SELECT
- percentile_cont(0.25) WITHIN GROUP (ORDER BY v),
- percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC)
+  percentile_cont(0.25) WITHIN GROUP (ORDER BY v),
+  percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC)
 FROM aggr;
 SELECT
   k,
@@ -288,3 +288,16 @@ SELECT
 FROM aggr
 GROUP BY k
 ORDER BY k;
+
+-- SPARK-37691: Support ANSI Aggregation Function: percentile_disc
+SELECT
+  percentile_disc(0.25) WITHIN GROUP (ORDER BY v),
+  percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC)
+FROM aggr;
+SELECT
+  k,
+  percentile_disc(0.25) WITHIN GROUP (ORDER BY v),
+  percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC)
+FROM aggr
+GROUP BY k
+ORDER BY k;
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part4.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part4.sql
index 0d255bed24e..3cf57b1f0e5 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part4.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part4.sql
@@ -33,10 +33,10 @@
 --      (values (0::float8),(0.1),(0.25),(0.4),(0.5),(0.6),(0.75),(0.9),(1)) 
v(p)
 -- group by p order by p;
 
--- select percentile_cont(0.5) within group (order by b) from aggtest;
--- select percentile_cont(0.5) within group (order by b), sum(b) from aggtest;
--- select percentile_cont(0.5) within group (order by thousand) from tenk1;
--- select percentile_disc(0.5) within group (order by thousand) from tenk1;
+select percentile_cont(0.5) within group (order by b) from aggtest;
+select percentile_cont(0.5) within group (order by b), sum(b) from aggtest;
+select percentile_cont(0.5) within group (order by thousand) from tenk1;
+select percentile_disc(0.5) within group (order by thousand) from tenk1;
 -- [SPARK-28661] Hypothetical-Set Aggregate Functions
 -- select rank(3) within group (order by x)
 -- from (values (1),(1),(2),(2),(3),(3),(4)) v(x);
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-aggregates_part4.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-aggregates_part4.sql
index 8aea00073ee..dd14a7db0db 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-aggregates_part4.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-aggregates_part4.sql
@@ -30,10 +30,10 @@
 --      (values (0::float8),(0.1),(0.25),(0.4),(0.5),(0.6),(0.75),(0.9),(1)) 
v(p)
 -- group by p order by p;
 
--- select percentile_cont(0.5) within group (order by b) from aggtest;
--- select percentile_cont(0.5) within group (order by b), sum(b) from aggtest;
--- select percentile_cont(0.5) within group (order by thousand) from tenk1;
--- select percentile_disc(0.5) within group (order by thousand) from tenk1;
+select percentile_cont(0.5) within group (order by b) from aggtest;
+select percentile_cont(0.5) within group (order by b), sum(b) from aggtest;
+select percentile_cont(0.5) within group (order by thousand) from tenk1;
+select percentile_disc(0.5) within group (order by thousand) from tenk1;
 -- [SPARK-28661] Hypothetical-Set Aggregate Functions
 -- select rank(3) within group (order by x)
 -- from (values (1),(1),(2),(2),(3),(3),(4)) v(x);
diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql 
b/sql/core/src/test/resources/sql-tests/inputs/window.sql
index 29dc6980b65..e982683250c 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/window.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql
@@ -446,7 +446,9 @@ SELECT
     department,
     salary,
     percentile_cont(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY 
department),
-    percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION 
BY department)
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY 
department),
+    percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION 
BY department),
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION 
BY department)
 FROM basic_pays
 ORDER BY salary;
 
@@ -459,6 +461,15 @@ SELECT
 FROM basic_pays
 ORDER BY salary;
 
+SELECT
+    employee_name,
+    department,
+    salary,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY 
department ORDER BY salary),
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION 
BY department ORDER BY salary)
+FROM basic_pays
+ORDER BY salary;
+
 SELECT
     employee_name,
     department,
@@ -468,16 +479,40 @@ SELECT
 FROM basic_pays
 ORDER BY salary;
 
+SELECT
+    employee_name,
+    department,
+    salary,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY 
department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING),
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION 
BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING)
+FROM basic_pays
+ORDER BY salary;
+
 SELECT
     employee_name,
     department,
     salary,
     percentile_cont(0.25) WITHIN GROUP (ORDER BY salary) OVER w,
-    percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w,
+    percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w
 FROM basic_pays
 WINDOW w AS (PARTITION BY department)
 ORDER BY salary;
 
+SELECT
+    employee_name,
+    department,
+    salary,
+    percentile_cont(0.5) WITHIN GROUP (ORDER BY salary) OVER w,
+    percentile_disc(0.5) WITHIN GROUP (ORDER BY salary) OVER w,
+    percentile_cont(0.5) WITHIN GROUP (ORDER BY salary DESC) OVER w,
+    percentile_disc(0.5) WITHIN GROUP (ORDER BY salary DESC) OVER w
+FROM basic_pays
+WHERE salary > 8900
+WINDOW w AS (PARTITION BY department)
+ORDER BY salary;
+
 SELECT
     employee_name,
     department,
@@ -488,6 +523,16 @@ FROM basic_pays
 WINDOW w AS (PARTITION BY department ORDER BY salary)
 ORDER BY salary;
 
+SELECT
+    employee_name,
+    department,
+    salary,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w
+FROM basic_pays
+WINDOW w AS (PARTITION BY department ORDER BY salary)
+ORDER BY salary;
+
 SELECT
     employee_name,
     department,
@@ -497,3 +542,13 @@ SELECT
 FROM basic_pays
 WINDOW w AS (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 
FOLLOWING)
 ORDER BY salary;
+
+SELECT
+    employee_name,
+    department,
+    salary,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w
+FROM basic_pays
+WINDOW w AS (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 
FOLLOWING)
+ORDER BY salary;
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out 
b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
index 75bf4f1c2f7..bad431383ff 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 103
+-- Number of queries: 105
 
 
 -- !query
@@ -1007,11 +1007,11 @@ struct<k:int,regr_sxx(y, x):double>
 
 -- !query
 SELECT
- percentile_cont(0.25) WITHIN GROUP (ORDER BY v),
- percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC)
+  percentile_cont(0.25) WITHIN GROUP (ORDER BY v),
+  percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC)
 FROM aggr
 -- !query schema
-struct<percentile_cont(v, 0.25):double,percentile_cont(v, (1 - 0.25)):double>
+struct<percentile_cont(0.25) WITHIN GROUP (ORDER BY 
v):double,percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC):double>
 -- !query output
 10.0   30.0
 
@@ -1025,10 +1025,39 @@ FROM aggr
 GROUP BY k
 ORDER BY k
 -- !query schema
-struct<k:int,percentile_cont(v, 0.25):double,percentile_cont(v, (1 - 
0.25)):double>
+struct<k:int,percentile_cont(0.25) WITHIN GROUP (ORDER BY 
v):double,percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC):double>
 -- !query output
 0      10.0    30.0
 1      12.5    17.5
 2      17.5    26.25
 3      60.0    60.0
 4      NULL    NULL
+
+
+-- !query
+SELECT
+  percentile_disc(0.25) WITHIN GROUP (ORDER BY v),
+  percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC)
+FROM aggr
+-- !query schema
+struct<percentile_disc(0.25) WITHIN GROUP (ORDER BY 
v):double,percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC):double>
+-- !query output
+10.0   30.0
+
+
+-- !query
+SELECT
+  k,
+  percentile_disc(0.25) WITHIN GROUP (ORDER BY v),
+  percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC)
+FROM aggr
+GROUP BY k
+ORDER BY k
+-- !query schema
+struct<k:int,percentile_disc(0.25) WITHIN GROUP (ORDER BY 
v):double,percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC):double>
+-- !query output
+0      10.0    30.0
+1      10.0    20.0
+2      10.0    30.0
+3      60.0    60.0
+4      NULL    NULL
diff --git 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part4.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part4.sql.out
index b7bbdc50dfb..8c21a5067bf 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part4.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part4.sql.out
@@ -1,5 +1,34 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 0
+-- Number of queries: 4
 
 
+-- !query
+select percentile_cont(0.5) within group (order by b) from aggtest
+-- !query schema
+struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double>
+-- !query output
+53.44850015640259
 
+
+-- !query
+select percentile_cont(0.5) within group (order by b), sum(b) from aggtest
+-- !query schema
+struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double,sum(b):double>
+-- !query output
+53.44850015640259      431.77260909229517
+
+
+-- !query
+select percentile_cont(0.5) within group (order by thousand) from tenk1
+-- !query schema
+struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double>
+-- !query output
+499.5
+
+
+-- !query
+select percentile_disc(0.5) within group (order by thousand) from tenk1
+-- !query schema
+struct<percentile_disc(0.5) WITHIN GROUP (ORDER BY v):double>
+-- !query output
+499.0
diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part4.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part4.sql.out
index b7bbdc50dfb..8c21a5067bf 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part4.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part4.sql.out
@@ -1,5 +1,34 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 0
+-- Number of queries: 4
 
 
+-- !query
+select percentile_cont(0.5) within group (order by b) from aggtest
+-- !query schema
+struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double>
+-- !query output
+53.44850015640259
 
+
+-- !query
+select percentile_cont(0.5) within group (order by b), sum(b) from aggtest
+-- !query schema
+struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double,sum(b):double>
+-- !query output
+53.44850015640259      431.77260909229517
+
+
+-- !query
+select percentile_cont(0.5) within group (order by thousand) from tenk1
+-- !query schema
+struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double>
+-- !query output
+499.5
+
+
+-- !query
+select percentile_disc(0.5) within group (order by thousand) from tenk1
+-- !query schema
+struct<percentile_disc(0.5) WITHIN GROUP (ORDER BY v):double>
+-- !query output
+499.0
diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out 
b/sql/core/src/test/resources/sql-tests/results/window.sql.out
index bac00d85c67..7e0fd2772d8 100644
--- a/sql/core/src/test/resources/sql-tests/results/window.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 61
+-- Number of queries: 66
 
 
 -- !query
@@ -1205,29 +1205,31 @@ SELECT
     department,
     salary,
     percentile_cont(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY 
department),
-    percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION 
BY department)
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY 
department),
+    percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION 
BY department),
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION 
BY department)
 FROM basic_pays
 ORDER BY salary
 -- !query schema
-struct<employee_name:string,department:string,salary:int,percentile_cont(salary,
 0.25) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING):double,percentile_cont(salary, (1 - 0.25)) OVER (PARTITION 
BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double>
+struct<employee_name:string,department:string,salary:int,percentile_cont(0.25) 
WITHIN GROUP (ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_disc(0.25) WITHIN GROUP 
(ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING):double,percentile_cont(0.25) WITHIN GROUP (ORDER BY v 
DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING):double,p [...]
 -- !query output
-Leslie Thompson        IT      5186    5917.75 7381.25
-Anthony Bow    Accounting      6627    8543.75 9746.5
-Foon Yue Tseng Sales   6660    8550.75 9721.5
-Gerard Hernandez       SCM     6949    10449.0 11303.0
-Leslie Jennings        IT      8113    5917.75 7381.25
-Diane Murphy   Accounting      8435    8543.75 9746.5
-William Patterson      Accounting      8870    8543.75 9746.5
-Jeff Firrelli  Accounting      8992    8543.75 9746.5
-Julie Firrelli Sales   9181    8550.75 9721.5
-Steve Patterson        Sales   9441    8550.75 9721.5
-Mary Patterson Accounting      9998    8543.75 9746.5
-Loui Bondur    SCM     10449   10449.0 11303.0
-George Vanauf  Sales   10563   8550.75 9721.5
-Barry Jones    SCM     10586   10449.0 11303.0
-Pamela Castillo        SCM     11303   10449.0 11303.0
-Gerard Bondur  Accounting      11472   8543.75 9746.5
-Larry Bott     SCM     11798   10449.0 11303.0
+Leslie Thompson        IT      5186    5917.75 5186.0  7381.25 8113.0
+Anthony Bow    Accounting      6627    8543.75 8435.0  9746.5  9998.0
+Foon Yue Tseng Sales   6660    8550.75 6660.0  9721.5  10563.0
+Gerard Hernandez       SCM     6949    10449.0 10449.0 11303.0 11303.0
+Leslie Jennings        IT      8113    5917.75 5186.0  7381.25 8113.0
+Diane Murphy   Accounting      8435    8543.75 8435.0  9746.5  9998.0
+William Patterson      Accounting      8870    8543.75 8435.0  9746.5  9998.0
+Jeff Firrelli  Accounting      8992    8543.75 8435.0  9746.5  9998.0
+Julie Firrelli Sales   9181    8550.75 6660.0  9721.5  10563.0
+Steve Patterson        Sales   9441    8550.75 6660.0  9721.5  10563.0
+Mary Patterson Accounting      9998    8543.75 8435.0  9746.5  9998.0
+Loui Bondur    SCM     10449   10449.0 10449.0 11303.0 11303.0
+George Vanauf  Sales   10563   8550.75 6660.0  9721.5  10563.0
+Barry Jones    SCM     10586   10449.0 10449.0 11303.0 11303.0
+Pamela Castillo        SCM     11303   10449.0 10449.0 11303.0 11303.0
+Gerard Bondur  Accounting      11472   8543.75 8435.0  9746.5  9998.0
+Larry Bott     SCM     11798   10449.0 10449.0 11303.0 11303.0
 
 
 -- !query
@@ -1243,7 +1245,23 @@ ORDER BY salary
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Cannot specify order by or frame for 'PERCENTILE_CONT'.
+Cannot specify order by or frame for 'percentile_cont'.
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY 
department ORDER BY salary),
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION 
BY department ORDER BY salary)
+FROM basic_pays
+ORDER BY salary
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Cannot specify order by or frame for 'percentile_disc'.
 
 
 -- !query
@@ -1259,7 +1277,23 @@ ORDER BY salary
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Cannot specify order by or frame for 'PERCENTILE_CONT'.
+Cannot specify order by or frame for 'percentile_cont'.
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY 
department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING),
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION 
BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING)
+FROM basic_pays
+ORDER BY salary
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Cannot specify order by or frame for 'percentile_disc'.
 
 
 -- !query
@@ -1268,30 +1302,60 @@ SELECT
     department,
     salary,
     percentile_cont(0.25) WITHIN GROUP (ORDER BY salary) OVER w,
-    percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w,
+    percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w
 FROM basic_pays
 WINDOW w AS (PARTITION BY department)
 ORDER BY salary
 -- !query schema
-struct<employee_name:string,department:string,salary:int,percentile_cont(salary,
 0.25) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING):double,percentile_cont(salary, (1 - 0.25)) OVER (PARTITION 
BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double>
+struct<employee_name:string,department:string,salary:int,percentile_cont(0.25) 
WITHIN GROUP (ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_disc(0.25) WITHIN GROUP 
(ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING):double,percentile_cont(0.25) WITHIN GROUP (ORDER BY v 
DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING):double,p [...]
 -- !query output
-Leslie Thompson        IT      5186    5917.75 7381.25
-Anthony Bow    Accounting      6627    8543.75 9746.5
-Foon Yue Tseng Sales   6660    8550.75 9721.5
-Gerard Hernandez       SCM     6949    10449.0 11303.0
-Leslie Jennings        IT      8113    5917.75 7381.25
-Diane Murphy   Accounting      8435    8543.75 9746.5
-William Patterson      Accounting      8870    8543.75 9746.5
-Jeff Firrelli  Accounting      8992    8543.75 9746.5
-Julie Firrelli Sales   9181    8550.75 9721.5
-Steve Patterson        Sales   9441    8550.75 9721.5
-Mary Patterson Accounting      9998    8543.75 9746.5
-Loui Bondur    SCM     10449   10449.0 11303.0
-George Vanauf  Sales   10563   8550.75 9721.5
-Barry Jones    SCM     10586   10449.0 11303.0
-Pamela Castillo        SCM     11303   10449.0 11303.0
-Gerard Bondur  Accounting      11472   8543.75 9746.5
-Larry Bott     SCM     11798   10449.0 11303.0
+Leslie Thompson        IT      5186    5917.75 5186.0  7381.25 8113.0
+Anthony Bow    Accounting      6627    8543.75 8435.0  9746.5  9998.0
+Foon Yue Tseng Sales   6660    8550.75 6660.0  9721.5  10563.0
+Gerard Hernandez       SCM     6949    10449.0 10449.0 11303.0 11303.0
+Leslie Jennings        IT      8113    5917.75 5186.0  7381.25 8113.0
+Diane Murphy   Accounting      8435    8543.75 8435.0  9746.5  9998.0
+William Patterson      Accounting      8870    8543.75 8435.0  9746.5  9998.0
+Jeff Firrelli  Accounting      8992    8543.75 8435.0  9746.5  9998.0
+Julie Firrelli Sales   9181    8550.75 6660.0  9721.5  10563.0
+Steve Patterson        Sales   9441    8550.75 6660.0  9721.5  10563.0
+Mary Patterson Accounting      9998    8543.75 8435.0  9746.5  9998.0
+Loui Bondur    SCM     10449   10449.0 10449.0 11303.0 11303.0
+George Vanauf  Sales   10563   8550.75 6660.0  9721.5  10563.0
+Barry Jones    SCM     10586   10449.0 10449.0 11303.0 11303.0
+Pamela Castillo        SCM     11303   10449.0 10449.0 11303.0 11303.0
+Gerard Bondur  Accounting      11472   8543.75 8435.0  9746.5  9998.0
+Larry Bott     SCM     11798   10449.0 10449.0 11303.0 11303.0
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    percentile_cont(0.5) WITHIN GROUP (ORDER BY salary) OVER w,
+    percentile_disc(0.5) WITHIN GROUP (ORDER BY salary) OVER w,
+    percentile_cont(0.5) WITHIN GROUP (ORDER BY salary DESC) OVER w,
+    percentile_disc(0.5) WITHIN GROUP (ORDER BY salary DESC) OVER w
+FROM basic_pays
+WHERE salary > 8900
+WINDOW w AS (PARTITION BY department)
+ORDER BY salary
+-- !query schema
+struct<employee_name:string,department:string,salary:int,percentile_cont(0.5) 
WITHIN GROUP (ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_disc(0.5) WITHIN GROUP 
(ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING):double,percentile_cont(0.5) WITHIN GROUP (ORDER BY v DESC) 
OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING):double,perc [...]
+-- !query output
+Jeff Firrelli  Accounting      8992    9998.0  9998.0  9998.0  9998.0
+Julie Firrelli Sales   9181    9441.0  9441.0  9441.0  9441.0
+Steve Patterson        Sales   9441    9441.0  9441.0  9441.0  9441.0
+Mary Patterson Accounting      9998    9998.0  9998.0  9998.0  9998.0
+Loui Bondur    SCM     10449   10944.5 10586.0 10944.5 11303.0
+George Vanauf  Sales   10563   9441.0  9441.0  9441.0  9441.0
+Barry Jones    SCM     10586   10944.5 10586.0 10944.5 11303.0
+Pamela Castillo        SCM     11303   10944.5 10586.0 10944.5 11303.0
+Gerard Bondur  Accounting      11472   9998.0  9998.0  9998.0  9998.0
+Larry Bott     SCM     11798   10944.5 10586.0 10944.5 11303.0
 
 
 -- !query
@@ -1308,7 +1372,24 @@ ORDER BY salary
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Cannot specify order by or frame for 'PERCENTILE_CONT'.
+Cannot specify order by or frame for 'percentile_cont'.
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w
+FROM basic_pays
+WINDOW w AS (PARTITION BY department ORDER BY salary)
+ORDER BY salary
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Cannot specify order by or frame for 'percentile_disc'.
 
 
 -- !query
@@ -1325,4 +1406,21 @@ ORDER BY salary
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Cannot specify order by or frame for 'PERCENTILE_CONT'.
+Cannot specify order by or frame for 'percentile_cont'.
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w,
+    percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w
+FROM basic_pays
+WINDOW w AS (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 
FOLLOWING)
+ORDER BY salary
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Cannot specify order by or frame for 'percentile_disc'.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to