This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f1283c126878 [SPARK-46009][SQL][CONNECT] Merge the parse rule of 
PercentileCont and PercentileDisc into functionCall
f1283c126878 is described below

commit f1283c12687853f9cd190f8db69d97abe16a2d88
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Tue Dec 5 09:28:30 2023 +0800

    [SPARK-46009][SQL][CONNECT] Merge the parse rule of PercentileCont and 
PercentileDisc into functionCall
    
    ### What changes were proposed in this pull request?
    Spark SQL parser have a special rule to parse 
`[percentile_cont|percentile_disc](percentage) WITHIN GROUP (ORDER BY v)`.
    We should merge this rule into the `functionCall`.
    
    ### Why are the changes needed?
    Merge the parse rule of `PercentileCont` and `PercentileDisc` into 
`functionCall`.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    New test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #43910 from beliefer/SPARK-46009.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Jiaan Geng <belie...@163.com>
---
 common/utils/src/main/resources/error/README.md    |   1 +
 .../src/main/resources/error/error-classes.json    |  23 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |   7 +-
 ...id-inverse-distribution-function-error-class.md |  40 +++
 docs/sql-error-conditions.md                       |   8 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   4 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  29 +-
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   2 +
 .../spark/sql/catalyst/analysis/unresolved.scala   |  32 ++-
 .../aggregate/SupportsOrderingWithinGroup.scala    |  27 ++
 .../expressions/aggregate/percentiles.scala        |  81 +++++-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  34 +--
 .../spark/sql/errors/QueryCompilationErrors.scala  |  24 ++
 .../sql/catalyst/parser/PlanParserSuite.scala      |  49 +++-
 .../sql-functions/sql-expression-schema.md         |   2 +
 .../sql-tests/analyzer-results/percentiles.sql.out | 275 +++++++++++++++++++
 .../resources/sql-tests/inputs/percentiles.sql     |  48 ++++
 .../sql-tests/results/percentiles.sql.out          | 299 +++++++++++++++++++++
 18 files changed, 922 insertions(+), 63 deletions(-)

diff --git a/common/utils/src/main/resources/error/README.md 
b/common/utils/src/main/resources/error/README.md
index c9fdd84e7442..556a634e9927 100644
--- a/common/utils/src/main/resources/error/README.md
+++ b/common/utils/src/main/resources/error/README.md
@@ -1309,6 +1309,7 @@ The following SQLSTATEs are collated from:
 |HZ320    |HZ   |RDA-specific condition                            |320     
|version not supported                                       |RDA/SQL        |Y 
      |RDA/SQL                                                                  
   |
 |HZ321    |HZ   |RDA-specific condition                            |321     
|TCP/IP error                                                |RDA/SQL        |Y 
      |RDA/SQL                                                                  
   |
 |HZ322    |HZ   |RDA-specific condition                            |322     
|TLS alert                                                   |RDA/SQL        |Y 
      |RDA/SQL                                                                  
   |
+|ID001    |IM   |Invalid inverse distribution function             |001     
|Invalid inverse distribution function                       |SQL/Foundation |N 
      |SQL/Foundation PostgreSQL Oracle Snowflake Redshift H2                   
   |
 |IM001    |IM   |ODBC driver                                       |001     
|Driver does not support this function                       |SQL Server     |N 
      |SQL Server                                                               
   |
 |IM002    |IM   |ODBC driver                                       |002     
|Data source name not found and no default driver specified  |SQL Server     |N 
      |SQL Server                                                               
   |
 |IM003    |IM   |ODBC driver                                       |003     
|Specified driver could not be loaded                        |SQL Server     |N 
      |SQL Server                                                               
   |
diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 6795ebcb0bd0..a808be9510cf 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1858,6 +1858,29 @@
     },
     "sqlState" : "42000"
   },
+  "INVALID_INVERSE_DISTRIBUTION_FUNCTION" : {
+    "message" : [
+      "Invalid inverse distribution function <funcName>."
+    ],
+    "subClass" : {
+      "DISTINCT_UNSUPPORTED" : {
+        "message" : [
+          "Cannot use DISTINCT with WITHIN GROUP."
+        ]
+      },
+      "WITHIN_GROUP_MISSING" : {
+        "message" : [
+          "WITHIN GROUP is required for inverse distribution function."
+        ]
+      },
+      "WRONG_NUM_ORDERINGS" : {
+        "message" : [
+          "Requires <expectedNum> orderings in WITHIN GROUP but got 
<actualNum>."
+        ]
+      }
+    },
+    "sqlState" : "ID001"
+  },
   "INVALID_JSON_ROOT_FIELD" : {
     "message" : [
       "Cannot convert JSON root field to target Spark type."
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index dc1730c78267..5c73f07a3b0e 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1714,7 +1714,7 @@ class SparkConnectPlanner(
         children(2) match {
           case Literal(b, BinaryType) if b != null =>
             (Some(b.asInstanceOf[Array[Byte]]), Map.empty[String, String])
-          case UnresolvedFunction(Seq("map"), arguments, _, _, _) =>
+          case UnresolvedFunction(Seq("map"), arguments, _, _, _, _) =>
             (None, ExprUtils.convertToMapData(CreateMap(arguments)))
           case other =>
             throw InvalidPlanInput(
@@ -1730,7 +1730,7 @@ class SparkConnectPlanner(
               s"DescFilePath in $functionName should be a literal binary, but 
got $other")
         }
         val map = children(3) match {
-          case UnresolvedFunction(Seq("map"), arguments, _, _, _) =>
+          case UnresolvedFunction(Seq("map"), arguments, _, _, _, _) =>
             ExprUtils.convertToMapData(CreateMap(arguments))
           case other =>
             throw InvalidPlanInput(
@@ -2064,7 +2064,8 @@ class SparkConnectPlanner(
   @scala.annotation.tailrec
   private def extractMapData(expr: Expression, field: String): Map[String, 
String] = expr match {
     case map: CreateMap => ExprUtils.convertToMapData(map)
-    case UnresolvedFunction(Seq("map"), args, _, _, _) => 
extractMapData(CreateMap(args), field)
+    case UnresolvedFunction(Seq("map"), args, _, _, _, _) =>
+      extractMapData(CreateMap(args), field)
     case other => throw InvalidPlanInput(s"$field should be created by map, 
but got $other")
   }
 
diff --git 
a/docs/sql-error-conditions-invalid-inverse-distribution-function-error-class.md
 
b/docs/sql-error-conditions-invalid-inverse-distribution-function-error-class.md
new file mode 100644
index 000000000000..d2091c72f43c
--- /dev/null
+++ 
b/docs/sql-error-conditions-invalid-inverse-distribution-function-error-class.md
@@ -0,0 +1,40 @@
+---
+layout: global
+title: INVALID_INVERSE_DISTRIBUTION_FUNCTION error class
+displayTitle: INVALID_INVERSE_DISTRIBUTION_FUNCTION error class
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+SQLSTATE: ID001
+
+Invalid inverse distribution function `<funcName>`.
+
+This error class has the following derived error classes:
+
+## DISTINCT_UNSUPPORTED
+
+Cannot use DISTINCT with WITHIN GROUP.
+
+## WITHIN_GROUP_MISSING
+
+WITHIN GROUP is required for inverse distribution function.
+
+## WRONG_NUM_ORDERINGS
+
+Requires `<expectedNum>` orderings in WITHIN GROUP but got `<actualNum>`.
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 1943f8359572..8074ba9233d0 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1125,6 +1125,14 @@ Invalid inline table.
 
 For more details see 
[INVALID_INLINE_TABLE](sql-error-conditions-invalid-inline-table-error-class.html)
 
+### 
[INVALID_INVERSE_DISTRIBUTION_FUNCTION](sql-error-conditions-invalid-inverse-distribution-function-error-class.html)
+
+SQLSTATE: ID001
+
+Invalid inverse distribution function `<funcName>`.
+
+For more details see 
[INVALID_INVERSE_DISTRIBUTION_FUNCTION](sql-error-conditions-invalid-inverse-distribution-function-error-class.html)
+
 ### INVALID_JSON_ROOT_FIELD
 
 [SQLSTATE: 22032](sql-error-conditions-sqlstates.html#class-22-data-exception)
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 439a12c30181..0ab3c5ac0c36 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -979,6 +979,7 @@ primaryExpression
     | LEFT_PAREN query RIGHT_PAREN                                             
                #subqueryExpression
     | functionName LEFT_PAREN (setQuantifier? argument+=functionArgument
        (COMMA argument+=functionArgument)*)? RIGHT_PAREN
+       (WITHIN GROUP LEFT_PAREN ORDER BY sortItem (COMMA sortItem)* 
RIGHT_PAREN)?
        (FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)?
        (nullsOption=(IGNORE | RESPECT) NULLS)? ( OVER windowSpec)?             
                #functionCall
     | identifier ARROW expression                                              
                #lambda
@@ -994,9 +995,6 @@ primaryExpression
        FROM srcStr=valueExpression RIGHT_PAREN                                 
                #trim
     | OVERLAY LEFT_PAREN input=valueExpression PLACING replace=valueExpression
       FROM position=valueExpression (FOR length=valueExpression)? RIGHT_PAREN  
                #overlay
-    | name=(PERCENTILE_CONT | PERCENTILE_DISC) LEFT_PAREN 
percentage=valueExpression RIGHT_PAREN
-        WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN
-        (FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)? ( OVER 
windowSpec)?     #percentile
     ;
 
 literalType
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 6ddc3cbbd80f..e5961b46e743 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2047,7 +2047,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       val externalFunctionNameSet = new mutable.HashSet[Seq[String]]()
 
       
plan.resolveExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
-        case f @ UnresolvedFunction(nameParts, _, _, _, _) =>
+        case f @ UnresolvedFunction(nameParts, _, _, _, _, _) =>
           if 
(ResolveFunctions.lookupBuiltinOrTempFunction(nameParts).isDefined) {
             f
           } else {
@@ -2207,7 +2207,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         q.transformExpressionsUpWithPruning(
           _.containsAnyPattern(UNRESOLVED_FUNCTION, GENERATOR),
           ruleId) {
-          case u @ UnresolvedFunction(nameParts, arguments, _, _, _)
+          case u @ UnresolvedFunction(nameParts, arguments, _, _, _, _)
               if hasLambdaAndResolvedArguments(arguments) => withPosition(u) {
             resolveBuiltinOrTempFunction(nameParts, arguments, Some(u)).map {
               case func: HigherOrderFunction => func
@@ -2236,7 +2236,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
             }
           }
 
-          case u @ UnresolvedFunction(nameParts, arguments, _, _, _) => 
withPosition(u) {
+          case u @ UnresolvedFunction(nameParts, arguments, _, _, _, _) => 
withPosition(u) {
             resolveBuiltinOrTempFunction(nameParts, arguments, 
Some(u)).getOrElse {
               val CatalogAndIdentifier(catalog, ident) = 
expandIdentifier(nameParts)
               if (CatalogV2Util.isSessionCatalog(catalog)) {
@@ -2329,6 +2329,16 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         numArgs: Int,
         u: UnresolvedFunction): Expression = {
       func match {
+        case owg: SupportsOrderingWithinGroup if u.isDistinct =>
+          throw 
QueryCompilationErrors.distinctInverseDistributionFunctionUnsupportedError(
+            owg.prettyName)
+        case owg: SupportsOrderingWithinGroup if u.orderingWithinGroup.isEmpty 
=>
+          throw 
QueryCompilationErrors.inverseDistributionFunctionMissingWithinGroupError(
+            owg.prettyName)
+        case f
+          if !f.isInstanceOf[SupportsOrderingWithinGroup] && 
u.orderingWithinGroup.nonEmpty =>
+          throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+            func.prettyName, "WITHIN GROUP (ORDER BY ...)")
         // AggregateWindowFunctions are AggregateFunctions that can only be 
evaluated within
         // the context of a Window clause. They do not need to be wrapped in an
         // AggregateExpression.
@@ -2371,6 +2381,13 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         case agg: AggregateFunction =>
           // Note: PythonUDAF does not support these advanced clauses.
           if (agg.isInstanceOf[PythonUDAF]) 
checkUnsupportedAggregateClause(agg, u)
+          // After parse, the inverse distribution functions not set the 
ordering within group yet.
+          val newAgg = agg match {
+            case owg: SupportsOrderingWithinGroup if 
u.orderingWithinGroup.nonEmpty =>
+              owg.withOrderingWithinGroup(u.orderingWithinGroup)
+            case _ =>
+              agg
+          }
 
           u.filter match {
             case Some(filter) if !filter.deterministic =>
@@ -2384,17 +2401,17 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
             case _ =>
           }
           if (u.ignoreNulls) {
-            val aggFunc = agg match {
+            val aggFunc = newAgg match {
               case first: First => first.copy(ignoreNulls = u.ignoreNulls)
               case last: Last => last.copy(ignoreNulls = u.ignoreNulls)
               case any_value: AnyValue => any_value.copy(ignoreNulls = 
u.ignoreNulls)
               case _ =>
                 throw 
QueryCompilationErrors.functionWithUnsupportedSyntaxError(
-                  agg.prettyName, "IGNORE NULLS")
+                  newAgg.prettyName, "IGNORE NULLS")
             }
             aggFunc.toAggregateExpression(u.isDistinct, u.filter)
           } else {
-            agg.toAggregateExpression(u.isDistinct, u.filter)
+            newAgg.toAggregateExpression(u.isDistinct, u.filter)
           }
         // This function is not an aggregate function, just return the 
resolved one.
         case other =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index c8b0e8be4c07..016f764c7002 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -478,6 +478,8 @@ object FunctionRegistry {
     expression[Min]("min"),
     expression[MinBy]("min_by"),
     expression[Percentile]("percentile"),
+    expressionBuilder("percentile_cont", PercentileContBuilder),
+    expressionBuilder("percentile_disc", PercentileDiscBuilder),
     expression[Median]("median"),
     expression[Skewness]("skewness"),
     expression[ApproximatePercentile]("percentile_approx"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 9342d29245aa..97912fb5d592 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -300,11 +300,12 @@ case class UnresolvedFunction(
     arguments: Seq[Expression],
     isDistinct: Boolean,
     filter: Option[Expression] = None,
-    ignoreNulls: Boolean = false)
+    ignoreNulls: Boolean = false,
+    orderingWithinGroup: Seq[SortOrder] = Seq.empty)
   extends Expression with Unevaluable {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
-  override def children: Seq[Expression] = arguments ++ filter.toSeq
+  override def children: Seq[Expression] = arguments ++ filter.toSeq ++ 
orderingWithinGroup
 
   override def dataType: DataType = throw new UnresolvedException("dataType")
   override def nullable: Boolean = throw new UnresolvedException("nullable")
@@ -320,9 +321,22 @@ case class UnresolvedFunction(
   override protected def withNewChildrenInternal(
       newChildren: IndexedSeq[Expression]): UnresolvedFunction = {
     if (filter.isDefined) {
-      copy(arguments = newChildren.dropRight(1), filter = 
Some(newChildren.last))
-    } else {
+      if (orderingWithinGroup.isEmpty) {
+        copy(arguments = newChildren.dropRight(1), filter = 
Some(newChildren.last))
+      } else {
+        val nonArgs = newChildren.takeRight(orderingWithinGroup.length + 1)
+        val newSortOrders = nonArgs.tail.asInstanceOf[Seq[SortOrder]]
+        val newFilter = Some(nonArgs.head)
+        val newArgs = newChildren.dropRight(orderingWithinGroup.length + 1)
+        copy(arguments = newArgs, filter = newFilter, orderingWithinGroup = 
newSortOrders)
+      }
+    } else if (orderingWithinGroup.isEmpty) {
       copy(arguments = newChildren)
+    } else {
+      val newSortOrders =
+        
newChildren.takeRight(orderingWithinGroup.length).asInstanceOf[Seq[SortOrder]]
+      val newArgs = newChildren.dropRight(orderingWithinGroup.length)
+      copy(arguments = newArgs, orderingWithinGroup = newSortOrders)
     }
   }
 }
@@ -854,3 +868,13 @@ case class TempResolvedColumn(
     copy(child = newChild)
   final override val nodePatterns: Seq[TreePattern] = Seq(TEMP_RESOLVED_COLUMN)
 }
+
+/**
+ * A place holder expression used in inverse distribution functions,
+ * will be replaced after analyze.
+ */
+case object UnresolvedWithinGroup extends LeafExpression with Unevaluable {
+  override def nullable: Boolean = throw new UnresolvedException("nullable")
+  override def dataType: DataType = throw new UnresolvedException("dataType")
+  override lazy val resolved = false
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
new file mode 100644
index 000000000000..16ba962dc16c
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.expressions.SortOrder
+
+/**
+ * The trait used to set the [[SortOrder]] after inverse distribution 
functions parsed.
+ */
+trait SupportsOrderingWithinGroup { self: AggregateFunction =>
+  def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
index da27ba4b128d..c074b19f2419 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
@@ -20,14 +20,14 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 import java.util
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, 
TypeCheckResult, UnresolvedWithinGroup}
 import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, 
TypeCheckSuccess}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.Cast._
 import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike, UnaryLike}
 import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.TypeCollection.NumericAndAnsiInterval
@@ -359,6 +359,7 @@ case class PercentileCont(left: Expression, right: 
Expression, reverse: Boolean
   extends AggregateFunction
   with RuntimeReplaceableAggregate
   with ImplicitCastInputTypes
+  with SupportsOrderingWithinGroup
   with BinaryLike[Expression] {
   private lazy val percentile = new Percentile(left, right, reverse)
   override def replacement: Expression = percentile
@@ -374,6 +375,17 @@ case class PercentileCont(left: Expression, right: 
Expression, reverse: Boolean
     percentile.checkInputDataTypes()
   }
 
+  override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction = {
+    if (orderingWithinGroup.length != 1) {
+      throw 
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+        nodeName, 1, orderingWithinGroup.length)
+    }
+    orderingWithinGroup.head match {
+      case SortOrder(child, Ascending, _, _) => this.copy(left = child)
+      case SortOrder(child, Descending, _, _) => this.copy(left = child, 
reverse = true)
+    }
+  }
+
   override protected def withNewChildrenInternal(
       newLeft: Expression, newRight: Expression): PercentileCont =
     this.copy(left = newLeft, right = newRight)
@@ -394,7 +406,7 @@ case class PercentileDisc(
     mutableAggBufferOffset: Int = 0,
     inputAggBufferOffset: Int = 0,
     legacyCalculation: Boolean = 
SQLConf.get.getConf(SQLConf.LEGACY_PERCENTILE_DISC_CALCULATION))
-  extends PercentileBase with BinaryLike[Expression] {
+  extends PercentileBase with SupportsOrderingWithinGroup with 
BinaryLike[Expression] {
 
   val frequencyExpression: Expression = Literal(1L)
 
@@ -417,6 +429,17 @@ case class PercentileDisc(
     s"$prettyName($distinct${right.sql}) WITHIN GROUP (ORDER BY 
${left.sql}$direction)"
   }
 
+  override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction = {
+    if (orderingWithinGroup.length != 1) {
+      throw 
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+        nodeName, 1, orderingWithinGroup.length)
+    }
+    orderingWithinGroup.head match {
+      case SortOrder(expr, Ascending, _, _) => this.copy(child = expr)
+      case SortOrder(expr, Descending, _, _) => this.copy(child = expr, 
reverse = true)
+    }
+  }
+
   override protected def withNewChildrenInternal(
       newLeft: Expression, newRight: Expression): PercentileDisc = copy(
     child = newLeft,
@@ -444,3 +467,55 @@ case class PercentileDisc(
     }
   }
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(percentage) WITHIN GROUP (ORDER BY col) - Return a 
percentile value based on " +
+    "a continuous distribution of numeric or ANSI interval column `col` at the 
given " +
+    "`percentage` (specified in ORDER BY clause).",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(0.25) WITHIN GROUP (ORDER BY col) FROM VALUES (0), (10) 
AS tab(col);
+       2.5
+      > SELECT _FUNC_(0.25) WITHIN GROUP (ORDER BY col) FROM VALUES (INTERVAL 
'0' MONTH), (INTERVAL '10' MONTH) AS tab(col);
+       0-2
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+// scalastyle:on line.size.limit
+object PercentileContBuilder extends ExpressionBuilder {
+  override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
+    val numArgs = expressions.length
+    if (numArgs == 1) {
+      PercentileCont(UnresolvedWithinGroup, expressions(0))
+    } else {
+      throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(1), numArgs)
+    }
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(percentage) WITHIN GROUP (ORDER BY col) - Return a 
percentile value based on " +
+    "a discrete distribution of numeric or ANSI interval column `col` at the 
given " +
+    "`percentage` (specified in ORDER BY clause).",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(0.25) WITHIN GROUP (ORDER BY col) FROM VALUES (0), (10) 
AS tab(col);
+       0.0
+      > SELECT _FUNC_(0.25) WITHIN GROUP (ORDER BY col) FROM VALUES (INTERVAL 
'0' MONTH), (INTERVAL '10' MONTH) AS tab(col);
+       0-0
+  """,
+  group = "agg_funcs",
+  since = "4.0.0")
+// scalastyle:on line.size.limit
+object PercentileDiscBuilder extends ExpressionBuilder {
+  override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
+    val numArgs = expressions.length
+    if (numArgs == 1) {
+      PercentileDisc(UnresolvedWithinGroup, expressions(0))
+    } else {
+      throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(1), numArgs)
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 49bde72c48db..4af1801aaa18 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, 
SQLConfHelper, TableId
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, ClusterBySpec}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.{AnyValue, First, 
Last, PercentileCont, PercentileDisc}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AnyValue, First, 
Last}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -2206,35 +2206,6 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     UnresolvedFunction("extract", arguments, isDistinct = false)
   }
 
-  /**
-   * Create a Percentile expression.
-   */
-  override def visitPercentile(ctx: PercentileContext): Expression = 
withOrigin(ctx) {
-    val percentage = expression(ctx.percentage)
-    val sortOrder = visitSortItem(ctx.sortItem)
-    val percentile = ctx.name.getType match {
-      case SqlBaseParser.PERCENTILE_CONT =>
-        sortOrder.direction match {
-          case Ascending => PercentileCont(sortOrder.child, percentage)
-          case Descending => PercentileCont(sortOrder.child, percentage, true)
-        }
-      case SqlBaseParser.PERCENTILE_DISC =>
-        sortOrder.direction match {
-          case Ascending => PercentileDisc(sortOrder.child, percentage)
-          case Descending => PercentileDisc(sortOrder.child, percentage, true)
-        }
-    }
-    val filter = Option(ctx.where).map(expression(_))
-    val aggregateExpression = percentile.toAggregateExpression(false, filter)
-    ctx.windowSpec match {
-      case spec: WindowRefContext =>
-        UnresolvedWindowExpression(aggregateExpression, visitWindowRef(spec))
-      case spec: WindowDefContext =>
-        WindowExpression(aggregateExpression, visitWindowDef(spec))
-      case _ => aggregateExpression
-    }
-  }
-
   /**
    * Create a Substring/Substr expression.
    */
@@ -2296,6 +2267,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
       case expressions =>
         expressions
     }
+    val order = ctx.sortItem.asScala.map(visitSortItem)
     val filter = Option(ctx.where).map(expression(_))
     val ignoreNulls =
       Option(ctx.nullsOption).map(_.getType == 
SqlBaseParser.IGNORE).getOrElse(false)
@@ -2313,7 +2285,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
       val funcCtx = ctx.functionName
       val func = withFuncIdentClause(
         funcCtx,
-        ident => UnresolvedFunction(ident, arguments, isDistinct, filter, 
ignoreNulls)
+        ident => UnresolvedFunction(ident, arguments, isDistinct, filter, 
ignoreNulls, order.toSeq)
       )
 
       // Check if the function is evaluated in a windowed context.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index bbc51e5cbd67..b7e10dc194a0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -639,6 +639,30 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map.empty)
   }
 
+  def distinctInverseDistributionFunctionUnsupportedError(funcName: String): 
Throwable = {
+    new AnalysisException(
+      errorClass = 
"INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+      messageParameters = Map("funcName" -> toSQLId(funcName)))
+  }
+
+  def inverseDistributionFunctionMissingWithinGroupError(funcName: String): 
Throwable = {
+    new AnalysisException(
+      errorClass = 
"INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+      messageParameters = Map("funcName" -> toSQLId(funcName)))
+  }
+
+  def wrongNumOrderingsForInverseDistributionFunctionError(
+      funcName: String,
+      validOrderingsNumber: Int,
+      actualOrderingsNumber: Int): Throwable = {
+    new AnalysisException(
+      errorClass = "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+      messageParameters = Map(
+        "funcName" -> toSQLId(funcName),
+        "expectedNum" -> validOrderingsNumber.toString,
+        "actualNum" -> actualOrderingsNumber.toString))
+  }
+
   def aliasNumberNotMatchColumnNumberError(
       columnSize: Int, outputSize: Int, t: TreeNode[_]): Throwable = {
     new AnalysisException(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index c483aa1997fc..17dd7349e7be 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -23,7 +23,6 @@ import org.apache.spark.SparkThrowable
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, NamedParameter, 
PosParameter, RelationTimeTravel, UnresolvedAlias, UnresolvedAttribute, 
UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, 
UnresolvedRelation, UnresolvedStar, UnresolvedSubqueryColumnAliases, 
UnresolvedTableValuedFunction, UnresolvedTVFAliases}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.{PercentileCont, 
PercentileDisc}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.internal.SQLConf
@@ -1813,38 +1812,62 @@ class PlanParserSuite extends AnalysisTest {
 
     assertPercentilePlans(
       "SELECT PERCENTILE_CONT(0.1) WITHIN GROUP (ORDER BY col)",
-      PercentileCont(UnresolvedAttribute("col"), Literal(Decimal(0.1), 
DecimalType(1, 1)))
-        .toAggregateExpression()
+      UnresolvedFunction(
+        Seq("PERCENTILE_CONT"),
+        Seq(Literal(Decimal(0.1), DecimalType(1, 1))),
+        false,
+        None,
+        orderingWithinGroup = Seq(SortOrder(UnresolvedAttribute("col"), 
Ascending)))
     )
 
     assertPercentilePlans(
       "SELECT PERCENTILE_CONT(0.1) WITHIN GROUP (ORDER BY col DESC)",
-      PercentileCont(UnresolvedAttribute("col"),
-        Literal(Decimal(0.1), DecimalType(1, 1)), true).toAggregateExpression()
+      UnresolvedFunction(
+        Seq("PERCENTILE_CONT"),
+        Seq(Literal(Decimal(0.1), DecimalType(1, 1))),
+        false,
+        None,
+        orderingWithinGroup = Seq(SortOrder(UnresolvedAttribute("col"), 
Descending)))
     )
 
     assertPercentilePlans(
       "SELECT PERCENTILE_CONT(0.1) WITHIN GROUP (ORDER BY col) FILTER (WHERE 
id > 10)",
-      PercentileCont(UnresolvedAttribute("col"), Literal(Decimal(0.1), 
DecimalType(1, 1)))
-        .toAggregateExpression(false, 
Some(GreaterThan(UnresolvedAttribute("id"), Literal(10))))
+      UnresolvedFunction(
+        Seq("PERCENTILE_CONT"),
+        Seq(Literal(Decimal(0.1), DecimalType(1, 1))),
+        false,
+        Some(GreaterThan(UnresolvedAttribute("id"), Literal(10))),
+        orderingWithinGroup = Seq(SortOrder(UnresolvedAttribute("col"), 
Ascending)))
     )
 
     assertPercentilePlans(
       "SELECT PERCENTILE_DISC(0.1) WITHIN GROUP (ORDER BY col)",
-      PercentileDisc(UnresolvedAttribute("col"), Literal(Decimal(0.1), 
DecimalType(1, 1)))
-        .toAggregateExpression()
+      UnresolvedFunction(
+        Seq("PERCENTILE_DISC"),
+        Seq(Literal(Decimal(0.1), DecimalType(1, 1))),
+        false,
+        None,
+        orderingWithinGroup = Seq(SortOrder(UnresolvedAttribute("col"), 
Ascending)))
     )
 
     assertPercentilePlans(
       "SELECT PERCENTILE_DISC(0.1) WITHIN GROUP (ORDER BY col DESC)",
-      PercentileDisc(UnresolvedAttribute("col"),
-        Literal(Decimal(0.1), DecimalType(1, 1)), true).toAggregateExpression()
+      UnresolvedFunction(
+        Seq("PERCENTILE_DISC"),
+        Seq(Literal(Decimal(0.1), DecimalType(1, 1))),
+        false,
+        None,
+        orderingWithinGroup = Seq(SortOrder(UnresolvedAttribute("col"), 
Descending)))
     )
 
     assertPercentilePlans(
       "SELECT PERCENTILE_DISC(0.1) WITHIN GROUP (ORDER BY col) FILTER (WHERE 
id > 10)",
-      PercentileDisc(UnresolvedAttribute("col"), Literal(Decimal(0.1), 
DecimalType(1, 1)))
-        .toAggregateExpression(false, 
Some(GreaterThan(UnresolvedAttribute("id"), Literal(10))))
+      UnresolvedFunction(
+        Seq("PERCENTILE_DISC"),
+        Seq(Literal(Decimal(0.1), DecimalType(1, 1))),
+        false,
+        Some(GreaterThan(UnresolvedAttribute("id"), Literal(10))),
+        orderingWithinGroup = Seq(SortOrder(UnresolvedAttribute("col"), 
Ascending)))
     )
   }
 
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 8e6bad11c09a..1cdd061e1d3d 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -409,6 +409,8 @@
 | org.apache.spark.sql.catalyst.expressions.aggregate.MinBy | min_by | SELECT 
min_by(x, y) FROM VALUES ('a', 10), ('b', 50), ('c', 20) AS tab(x, y) | 
struct<min_by(x, y):string> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.Mode | mode | SELECT 
mode(col) FROM VALUES (0), (10), (10) AS tab(col) | struct<mode(col, 
false):int> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.Percentile | percentile 
| SELECT percentile(col, 0.3) FROM VALUES (0), (10) AS tab(col) | 
struct<percentile(col, 0.3, 1):double> |
+| org.apache.spark.sql.catalyst.expressions.aggregate.PercentileContBuilder | 
percentile_cont | SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY col) FROM 
VALUES (0), (10) AS tab(col) | struct<percentile_cont(0.25) WITHIN GROUP (ORDER 
BY col):double> |
+| org.apache.spark.sql.catalyst.expressions.aggregate.PercentileDiscBuilder | 
percentile_disc | SELECT percentile_disc(0.25) WITHIN GROUP (ORDER BY col) FROM 
VALUES (0), (10) AS tab(col) | struct<percentile_disc(0.25) WITHIN GROUP (ORDER 
BY col):double> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.RegrAvgX | regr_avgx | 
SELECT regr_avgx(y, x) FROM VALUES (1, 2), (2, 2), (2, 3), (2, 4) AS tab(y, x) 
| struct<regr_avgx(y, x):double> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.RegrAvgY | regr_avgy | 
SELECT regr_avgy(y, x) FROM VALUES (1, 2), (2, 2), (2, 3), (2, 4) AS tab(y, x) 
| struct<regr_avgy(y, x):double> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.RegrCount | regr_count | 
SELECT regr_count(y, x) FROM VALUES (1, 2), (2, 2), (2, 3), (2, 4) AS tab(y, x) 
| struct<regr_count(y, x):bigint> |
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out
index 1ab3ecec920d..90ef5ac35a1e 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/percentiles.sql.out
@@ -149,6 +149,281 @@ Aggregate [median(v#x) AS median(v)#x, percentile(v#x, 
cast(0.5 as double), 1, 0
                +- LocalRelation [k#x, v#x]
 
 
+-- !query
+SELECT
+  round(v, 0) WITHIN GROUP (ORDER BY v)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1023",
+  "messageParameters" : {
+    "prettyName" : "round",
+    "syntax" : "WITHIN GROUP (ORDER BY ...)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 46,
+    "fragment" : "round(v, 0) WITHIN GROUP (ORDER BY v)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  round(v, 0) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1023",
+  "messageParameters" : {
+    "prettyName" : "round",
+    "syntax" : "WITHIN GROUP (ORDER BY ...)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 68,
+    "fragment" : "round(v, 0) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile(v, 0.5) WITHIN GROUP (ORDER BY v)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1023",
+  "messageParameters" : {
+    "prettyName" : "percentile",
+    "syntax" : "WITHIN GROUP (ORDER BY ...)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 53,
+    "fragment" : "percentile(v, 0.5) WITHIN GROUP (ORDER BY v)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile(v, 0.5) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1023",
+  "messageParameters" : {
+    "prettyName" : "percentile",
+    "syntax" : "WITHIN GROUP (ORDER BY ...)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 75,
+    "fragment" : "percentile(v, 0.5) WITHIN GROUP (ORDER BY v) OVER (PARTITION 
BY k)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(DISTINCT 0.5) WITHIN GROUP (ORDER BY v)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 64,
+    "fragment" : "percentile_cont(DISTINCT 0.5) WITHIN GROUP (ORDER BY v)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(DISTINCT 0.5) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 86,
+    "fragment" : "percentile_cont(DISTINCT 0.5) WITHIN GROUP (ORDER BY v) OVER 
(PARTITION BY k)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont() WITHIN GROUP (ORDER BY v)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "0",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "1",
+    "functionName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 52,
+    "fragment" : "percentile_cont() WITHIN GROUP (ORDER BY v)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont() WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "0",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "1",
+    "functionName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 74,
+    "fragment" : "percentile_cont() WITHIN GROUP (ORDER BY v) OVER (PARTITION 
BY k)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(0.5)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 29,
+    "fragment" : "percentile_cont(0.5)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(0.5) OVER (PARTITION BY k)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 51,
+    "fragment" : "percentile_cont(0.5) OVER (PARTITION BY k)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(0.5) WITHIN GROUP (ORDER BY k, v)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "actualNum" : "2",
+    "expectedNum" : "1",
+    "funcName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 58,
+    "fragment" : "percentile_cont(0.5) WITHIN GROUP (ORDER BY k, v)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(k, 0.5) WITHIN GROUP (ORDER BY v)
+FROM aggr
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "2",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "1",
+    "functionName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 58,
+    "fragment" : "percentile_cont(k, 0.5) WITHIN GROUP (ORDER BY v)"
+  } ]
+}
+
+
 -- !query
 SELECT
   k,
diff --git a/sql/core/src/test/resources/sql-tests/inputs/percentiles.sql 
b/sql/core/src/test/resources/sql-tests/inputs/percentiles.sql
index eae8a71be7e5..4b3e8708222a 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/percentiles.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/percentiles.sql
@@ -63,6 +63,54 @@ SELECT
   percentile_cont(0.5) WITHIN GROUP (ORDER BY v)
 FROM aggr;
 
+SELECT
+  round(v, 0) WITHIN GROUP (ORDER BY v)
+FROM aggr;
+
+SELECT
+  round(v, 0) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr;
+
+SELECT
+  percentile(v, 0.5) WITHIN GROUP (ORDER BY v)
+FROM aggr;
+
+SELECT
+  percentile(v, 0.5) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr;
+
+SELECT
+  percentile_cont(DISTINCT 0.5) WITHIN GROUP (ORDER BY v)
+FROM aggr;
+
+SELECT
+  percentile_cont(DISTINCT 0.5) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr;
+
+SELECT
+  percentile_cont() WITHIN GROUP (ORDER BY v)
+FROM aggr;
+
+SELECT
+  percentile_cont() WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr;
+
+SELECT
+  percentile_cont(0.5)
+FROM aggr;
+
+SELECT
+  percentile_cont(0.5) OVER (PARTITION BY k)
+FROM aggr;
+
+SELECT
+  percentile_cont(0.5) WITHIN GROUP (ORDER BY k, v)
+FROM aggr;
+
+SELECT
+  percentile_cont(k, 0.5) WITHIN GROUP (ORDER BY v)
+FROM aggr;
+
 SELECT
   k,
   median(v),
diff --git a/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out 
b/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
index 70d3155d3173..5dc430522e51 100644
--- a/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/percentiles.sql.out
@@ -113,6 +113,305 @@ struct<median(v):double,percentile(v, 0.5, 
1):double,percentile_cont(0.5) WITHIN
 20.0   20.0    20.0
 
 
+-- !query
+SELECT
+  round(v, 0) WITHIN GROUP (ORDER BY v)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1023",
+  "messageParameters" : {
+    "prettyName" : "round",
+    "syntax" : "WITHIN GROUP (ORDER BY ...)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 46,
+    "fragment" : "round(v, 0) WITHIN GROUP (ORDER BY v)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  round(v, 0) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1023",
+  "messageParameters" : {
+    "prettyName" : "round",
+    "syntax" : "WITHIN GROUP (ORDER BY ...)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 68,
+    "fragment" : "round(v, 0) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile(v, 0.5) WITHIN GROUP (ORDER BY v)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1023",
+  "messageParameters" : {
+    "prettyName" : "percentile",
+    "syntax" : "WITHIN GROUP (ORDER BY ...)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 53,
+    "fragment" : "percentile(v, 0.5) WITHIN GROUP (ORDER BY v)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile(v, 0.5) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1023",
+  "messageParameters" : {
+    "prettyName" : "percentile",
+    "syntax" : "WITHIN GROUP (ORDER BY ...)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 75,
+    "fragment" : "percentile(v, 0.5) WITHIN GROUP (ORDER BY v) OVER (PARTITION 
BY k)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(DISTINCT 0.5) WITHIN GROUP (ORDER BY v)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 64,
+    "fragment" : "percentile_cont(DISTINCT 0.5) WITHIN GROUP (ORDER BY v)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(DISTINCT 0.5) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 86,
+    "fragment" : "percentile_cont(DISTINCT 0.5) WITHIN GROUP (ORDER BY v) OVER 
(PARTITION BY k)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont() WITHIN GROUP (ORDER BY v)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "0",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "1",
+    "functionName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 52,
+    "fragment" : "percentile_cont() WITHIN GROUP (ORDER BY v)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont() WITHIN GROUP (ORDER BY v) OVER (PARTITION BY k)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "0",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "1",
+    "functionName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 74,
+    "fragment" : "percentile_cont() WITHIN GROUP (ORDER BY v) OVER (PARTITION 
BY k)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(0.5)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 29,
+    "fragment" : "percentile_cont(0.5)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(0.5) OVER (PARTITION BY k)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 51,
+    "fragment" : "percentile_cont(0.5) OVER (PARTITION BY k)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(0.5) WITHIN GROUP (ORDER BY k, v)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "actualNum" : "2",
+    "expectedNum" : "1",
+    "funcName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 58,
+    "fragment" : "percentile_cont(0.5) WITHIN GROUP (ORDER BY k, v)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  percentile_cont(k, 0.5) WITHIN GROUP (ORDER BY v)
+FROM aggr
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "2",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "1",
+    "functionName" : "`percentile_cont`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 58,
+    "fragment" : "percentile_cont(k, 0.5) WITHIN GROUP (ORDER BY v)"
+  } ]
+}
+
+
 -- !query
 SELECT
   k,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to