This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e3f46ed57dc0 [SPARK-45796][SQL] Support MODE() WITHIN GROUP (ORDER BY 
col)
e3f46ed57dc0 is described below

commit e3f46ed57dc063566cdb9425b4d5e02c65332df1
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Thu Dec 14 19:20:48 2023 +0800

    [SPARK-45796][SQL] Support MODE() WITHIN GROUP (ORDER BY col)
    
    ### What changes were proposed in this pull request?
    The mainstream database supports this syntax.
    **H2**
    http://www.h2database.com/html/functions-aggregate.html#mode
    **Postgres**
    https://www.postgresql.org/docs/16/functions-aggregate.html
    
    **Syntax**:
    Aggregate function
    `MODE() WITHIN GROUP (ORDER BY sortSpecification)`
    
    Window function
    ```
    MODE() WITHIN GROUP (ORDER BY sortSpecification)
    [FILTER (WHERE expression)] [OVER windowNameOrSpecification]
    ```
    
    **Examples**:
    ```
    SELECT
      mode() WITHIN GROUP (ORDER BY v),
      mode() WITHIN GROUP (ORDER BY v) FILTER (WHERE k > 0)
    FROM aggr;
    ```
    
    ```
    SELECT
        employee_name,
        department,
        salary,
        mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department),
        mode() WITHIN GROUP (ORDER BY salary) FILTER (WHERE department = 
'Accounting') OVER (PARTITION BY department)
    FROM basic_pays
    ORDER BY salary;
    ```
    
    ### Why are the changes needed?
    Support `MODE() WITHIN GROUP (ORDER BY col)`
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    New test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #44184 from beliefer/SPARK-45796_new.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Jiaan Geng <belie...@163.com>
---
 .../explain-results/function_mode.explain          |   2 +-
 python/pyspark/sql/functions/builtin.py            |  22 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  10 +-
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   2 +-
 .../sql/catalyst/expressions/aggregate/Mode.scala  | 185 +++++---
 .../aggregate/SupportsOrderingWithinGroup.scala    |   1 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |   9 +
 .../sql-functions/sql-expression-schema.md         |   2 +-
 .../sql-tests/analyzer-results/group-by.sql.out    | 141 ------
 .../sql-tests/analyzer-results/mode.sql.out        | 511 +++++++++++++++++++++
 .../test/resources/sql-tests/inputs/group-by.sql   |  16 -
 .../src/test/resources/sql-tests/inputs/mode.sql   | 139 ++++++
 .../resources/sql-tests/results/group-by.sql.out   | 147 ------
 .../test/resources/sql-tests/results/mode.sql.out  | 495 ++++++++++++++++++++
 14 files changed, 1290 insertions(+), 392 deletions(-)

diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mode.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mode.explain
index 28bbb44b0fda..0952c9a14ef3 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mode.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mode.explain
@@ -1,2 +1,2 @@
-Aggregate [mode(a#0, 0, 0, false) AS mode(a, false)#0]
+Aggregate [mode(a#0, 0, 0, None) AS mode(a)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index e1cffff01018..17a64b1a4f1d 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -798,12 +798,12 @@ def mode(col: "ColumnOrName", deterministic: bool = 
False) -> Column:
     ...     ("dotNET", 2013, 48000), ("Java", 2013, 30000)],
     ...     schema=("course", "year", "earnings"))
     >>> df.groupby("course").agg(mode("year")).show()
-    +------+-----------------+
-    |course|mode(year, false)|
-    +------+-----------------+
-    |  Java|             2012|
-    |dotNET|             2012|
-    +------+-----------------+
+    +------+----------+
+    |course|mode(year)|
+    +------+----------+
+    |  Java|      2012|
+    |dotNET|      2012|
+    +------+----------+
 
     When multiple values have the same greatest frequency then either any of 
values is returned if
     deterministic is false or is not defined, or the lowest value is returned 
if deterministic is
@@ -811,11 +811,11 @@ def mode(col: "ColumnOrName", deterministic: bool = 
False) -> Column:
 
     >>> df2 = spark.createDataFrame([(-10,), (0,), (10,)], ["col"])
     >>> df2.select(mode("col", False), mode("col", True)).show()
-    +----------------+---------------+
-    |mode(col, false)|mode(col, true)|
-    +----------------+---------------+
-    |               0|            -10|
-    +----------------+---------------+
+    +---------+---------------------------------------+
+    |mode(col)|mode() WITHIN GROUP (ORDER BY col DESC)|
+    +---------+---------------------------------------+
+    |        0|                                    -10|
+    +---------+---------------------------------------+
     """
     return _invoke_function("mode", _to_java_column(col), deterministic)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ec91f9b21a76..acb9b610dc09 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2333,9 +2333,14 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         case owg: SupportsOrderingWithinGroup if u.isDistinct =>
           throw 
QueryCompilationErrors.distinctInverseDistributionFunctionUnsupportedError(
             owg.prettyName)
-        case owg: SupportsOrderingWithinGroup if u.orderingWithinGroup.isEmpty 
=>
+        case owg: SupportsOrderingWithinGroup
+          if !owg.orderingFilled && u.orderingWithinGroup.isEmpty =>
           throw 
QueryCompilationErrors.inverseDistributionFunctionMissingWithinGroupError(
             owg.prettyName)
+        case owg: SupportsOrderingWithinGroup
+          if owg.orderingFilled && u.orderingWithinGroup.nonEmpty =>
+          throw 
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+            owg.prettyName, 0, u.orderingWithinGroup.length)
         case f
           if !f.isInstanceOf[SupportsOrderingWithinGroup] && 
u.orderingWithinGroup.nonEmpty =>
           throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
@@ -2384,7 +2389,8 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
           if (agg.isInstanceOf[PythonUDAF]) 
checkUnsupportedAggregateClause(agg, u)
           // After parse, the inverse distribution functions not set the 
ordering within group yet.
           val newAgg = agg match {
-            case owg: SupportsOrderingWithinGroup if 
u.orderingWithinGroup.nonEmpty =>
+            case owg: SupportsOrderingWithinGroup
+              if !owg.orderingFilled && u.orderingWithinGroup.nonEmpty =>
               owg.withOrderingWithinGroup(u.orderingWithinGroup)
             case _ =>
               agg
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 016f764c7002..24e2bb767ab4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -511,7 +511,7 @@ object FunctionRegistry {
     expression[RegrSYY]("regr_syy"),
     expression[RegrSlope]("regr_slope"),
     expression[RegrIntercept]("regr_intercept"),
-    expression[Mode]("mode"),
+    expressionBuilder("mode", ModeBuilder),
     expression[HllSketchAgg]("hll_sketch_agg"),
     expression[HllUnionAgg]("hll_union_agg"),
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
index 4ac44d9d2c9f..d1a9cafdf61f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
@@ -18,88 +18,35 @@
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, 
TypeCheckSuccess}
-import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription, ImplicitCastInputTypes, Literal}
-import org.apache.spark.sql.catalyst.trees.{BinaryLike, UnaryLike}
+import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, 
UnresolvedWithinGroup}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, 
Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
 import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.catalyst.util.GenericArrayData
-import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
-import org.apache.spark.sql.errors.DataTypeErrors.{toSQLId, toSQLType}
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, 
BooleanType, DataType}
 import org.apache.spark.util.collection.OpenHashMap
 
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = """
-    _FUNC_(col[, deterministic]) - Returns the most frequent value for the 
values within `col`. NULL values are ignored. If all the values are NULL, or 
there are 0 rows, returns NULL.
-      When multiple values have the same greatest frequency then either any of 
values is returned if `deterministic` is false or is not defined, or the lowest 
value is returned if `deterministic` is true.""",
-  examples = """
-    Examples:
-      > SELECT _FUNC_(col) FROM VALUES (0), (10), (10) AS tab(col);
-       10
-      > SELECT _FUNC_(col) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' 
MONTH), (INTERVAL '10' MONTH) AS tab(col);
-       0-10
-      > SELECT _FUNC_(col) FROM VALUES (0), (10), (10), (null), (null), (null) 
AS tab(col);
-       10
-      > SELECT _FUNC_(col, false) FROM VALUES (-10), (0), (10) AS tab(col);
-       0
-      > SELECT _FUNC_(col, true) FROM VALUES (-10), (0), (10) AS tab(col);
-       -10
-  """,
-  group = "agg_funcs",
-  since = "3.4.0")
-// scalastyle:on line.size.limit
 case class Mode(
     child: Expression,
     mutableAggBufferOffset: Int = 0,
     inputAggBufferOffset: Int = 0,
-    deterministicExpr: Expression = Literal.FalseLiteral)
+    reverseOpt: Option[Boolean] = None)
   extends TypedAggregateWithHashMapAsBuffer with ImplicitCastInputTypes
-    with BinaryLike[Expression] {
+    with SupportsOrderingWithinGroup with UnaryLike[Expression] {
 
   def this(child: Expression) = this(child, 0, 0)
 
-  def this(child: Expression, deterministicExpr: Expression) = {
-    this(child, 0, 0, deterministicExpr)
+  def this(child: Expression, reverse: Boolean) = {
+    this(child, 0, 0, Some(reverse))
   }
 
-  @transient
-  protected lazy val deterministicResult = 
deterministicExpr.eval().asInstanceOf[Boolean]
-
-  override def left: Expression = child
-
-  override def right: Expression = deterministicExpr
-
   // Returns null for empty inputs
   override def nullable: Boolean = true
 
   override def dataType: DataType = child.dataType
 
-  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, 
BooleanType)
-
-  override def checkInputDataTypes(): TypeCheckResult = {
-    val defaultCheck = super.checkInputDataTypes()
-    if (defaultCheck.isFailure) {
-      return defaultCheck
-    }
-    if (!deterministicExpr.foldable) {
-      DataTypeMismatch(
-        errorSubClass = "NON_FOLDABLE_INPUT",
-        messageParameters = Map(
-          "inputName" -> toSQLId("deterministic"),
-          "inputType" -> toSQLType(deterministicExpr.dataType),
-          "inputExpr" -> toSQLExpr(deterministicExpr)
-        )
-      )
-    } else if (deterministicExpr.eval() == null) {
-      DataTypeMismatch(
-        errorSubClass = "UNEXPECTED_NULL",
-        messageParameters = Map("exprName" -> toSQLId("deterministic")))
-    } else {
-      TypeCheckSuccess
-    }
-  }
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
 
   override def prettyName: String = "mode"
 
@@ -128,16 +75,15 @@ case class Mode(
       return null
     }
 
-    (if (deterministicResult) {
-      // When deterministic result is rquired but multiple keys have the same 
greatest frequency
-      // then let's select the lowest.
-      val defaultKeyOrdering =
+    reverseOpt.map { reverse =>
+      val defaultKeyOrdering = if (reverse) {
+        
PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]].reverse
+      } else {
         
PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]]
-      val ordering = Ordering.Tuple2(Ordering.Long, defaultKeyOrdering.reverse)
+      }
+      val ordering = Ordering.Tuple2(Ordering.Long, defaultKeyOrdering)
       buffer.maxBy { case (key, count) => (count, key) }(ordering)
-    } else {
-      buffer.maxBy(_._2)
-    })._1
+    }.getOrElse(buffer.maxBy(_._2))._1
   }
 
   override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): 
Mode =
@@ -146,8 +92,103 @@ case class Mode(
   override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): Mode 
=
     copy(inputAggBufferOffset = newInputAggBufferOffset)
 
-  override def withNewChildrenInternal(newLeft: Expression, newRight: 
Expression): Expression =
-    copy(child = newLeft, deterministicExpr = newRight)
+  override def sql(isDistinct: Boolean): String = {
+    reverseOpt.map {
+      reverse =>
+        if (reverse) {
+          s"$prettyName() WITHIN GROUP (ORDER BY ${child.sql} DESC)"
+        } else {
+          s"$prettyName() WITHIN GROUP (ORDER BY ${child.sql})"
+        }
+    }.getOrElse(super.sql(isDistinct))
+  }
+
+  override def orderingFilled: Boolean = child != UnresolvedWithinGroup
+
+  assert(orderingFilled || (!orderingFilled && reverseOpt.isEmpty))
+
+  override def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction = {
+    child match {
+      case UnresolvedWithinGroup =>
+        if (orderingWithinGroup.length != 1) {
+          throw 
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+            nodeName, 1, orderingWithinGroup.length)
+        }
+        orderingWithinGroup.head match {
+          case SortOrder(child, Ascending, _, _) =>
+            this.copy(child = child, reverseOpt = Some(true))
+          case SortOrder(child, Descending, _, _) =>
+            this.copy(child = child, reverseOpt = Some(false))
+        }
+      case _ => this
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: Expression): 
Expression =
+    copy(child = newChild)
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(col[, deterministic]) - Returns the most frequent value for the 
values within `col`. NULL values are ignored. If all the values are NULL, or 
there are 0 rows, returns NULL.
+      When multiple values have the same greatest frequency then either any of 
values is returned if `deterministic` is false or is not defined, or the lowest 
value is returned if `deterministic` is true.
+    _FUNC_() WITHIN GROUP (ORDER BY col) - Returns the most frequent value for 
the values within `col` (specified in ORDER BY clause). NULL values are ignored.
+      If all the values are NULL, or there are 0 rows, returns NULL. When 
multiple values have the same greatest frequency only one value will be 
returned.
+      The value will be chosen based on sort direction. Return the smallest 
value if sort direction is asc or the largest value if sort direction is desc 
from multiple values with the same frequency.""",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(col) FROM VALUES (0), (10), (10) AS tab(col);
+       10
+      > SELECT _FUNC_(col) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' 
MONTH), (INTERVAL '10' MONTH) AS tab(col);
+       0-10
+      > SELECT _FUNC_(col) FROM VALUES (0), (10), (10), (null), (null), (null) 
AS tab(col);
+       10
+      > SELECT _FUNC_(col, false) FROM VALUES (-10), (0), (10) AS tab(col);
+       0
+      > SELECT _FUNC_(col, true) FROM VALUES (-10), (0), (10) AS tab(col);
+       -10
+      > SELECT _FUNC_() WITHIN GROUP (ORDER BY col) FROM VALUES (0), (10), 
(10) AS tab(col);
+       10
+      > SELECT _FUNC_() WITHIN GROUP (ORDER BY col) FROM VALUES (0), (10), 
(10), (20), (20) AS tab(col);
+       10
+      > SELECT _FUNC_() WITHIN GROUP (ORDER BY col DESC) FROM VALUES (0), 
(10), (10), (20), (20) AS tab(col);
+       20
+  """,
+  group = "agg_funcs",
+  since = "3.4.0")
+// scalastyle:on line.size.limit
+object ModeBuilder extends ExpressionBuilder {
+  override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {
+    val numArgs = expressions.length
+    if (numArgs == 0) {
+      Mode(UnresolvedWithinGroup)
+    } else if (numArgs == 1) {
+      // For compatibility with function calls without WITHIN GROUP.
+      Mode(expressions(0))
+    } else if (numArgs == 2) {
+      // For compatibility with function calls without WITHIN GROUP.
+      if (!expressions(1).foldable) {
+        throw QueryCompilationErrors.nonFoldableArgumentError(
+          funcName, "deterministic", BooleanType)
+      }
+      val deterministicResult = expressions(1).eval()
+      if (deterministicResult == null) {
+        throw QueryCompilationErrors.unexpectedNullError("deterministic", 
expressions(1))
+      }
+      if (expressions(1).dataType != BooleanType) {
+        throw QueryCompilationErrors.unexpectedInputDataTypeError(
+          funcName, 2, BooleanType, expressions(1))
+      }
+      if (deterministicResult.asInstanceOf[Boolean]) {
+        new Mode(expressions(0), true)
+      } else {
+        Mode(expressions(0))
+      }
+    } else {
+      throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(0), numArgs)
+    }
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
index 16ba962dc16c..9c0502a2c1fc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala
@@ -23,5 +23,6 @@ import org.apache.spark.sql.catalyst.expressions.SortOrder
  * The trait used to set the [[SortOrder]] after inverse distribution 
functions parsed.
  */
 trait SupportsOrderingWithinGroup { self: AggregateFunction =>
+  def orderingFilled: Boolean = false
   def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index c1ee9b49d8de..5f49fe03cba7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1954,6 +1954,15 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "inputType" -> toSQLType(expression.dataType)))
   }
 
+  def unexpectedNullError(exprName: String, expression: Expression): Throwable 
= {
+    new AnalysisException(
+      errorClass = "DATATYPE_MISMATCH.UNEXPECTED_NULL",
+      messageParameters = Map(
+        "exprName" -> toSQLId(exprName),
+        "sqlExpr" -> toSQLExpr(expression)
+      ))
+  }
+
   def streamJoinStreamWithoutEqualityPredicateUnsupportedError(plan: 
LogicalPlan): Throwable = {
     new ExtendedAnalysisException(
       new AnalysisException(errorClass = "_LEGACY_ERROR_TEMP_1181", 
messageParameters = Map.empty),
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 053b3c56b29e..6cc42ba9c902 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -407,7 +407,7 @@
 | org.apache.spark.sql.catalyst.expressions.aggregate.Median | median | SELECT 
median(col) FROM VALUES (0), (10) AS tab(col) | struct<median(col):double> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.Min | min | SELECT 
min(col) FROM VALUES (10), (-1), (20) AS tab(col) | struct<min(col):int> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.MinBy | min_by | SELECT 
min_by(x, y) FROM VALUES ('a', 10), ('b', 50), ('c', 20) AS tab(x, y) | 
struct<min_by(x, y):string> |
-| org.apache.spark.sql.catalyst.expressions.aggregate.Mode | mode | SELECT 
mode(col) FROM VALUES (0), (10), (10) AS tab(col) | struct<mode(col, 
false):int> |
+| org.apache.spark.sql.catalyst.expressions.aggregate.ModeBuilder | mode | 
SELECT mode(col) FROM VALUES (0), (10), (10) AS tab(col) | 
struct<mode(col):int> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.Percentile | percentile 
| SELECT percentile(col, 0.3) FROM VALUES (0), (10) AS tab(col) | 
struct<percentile(col, 0.3, 1):double> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.PercentileContBuilder | 
percentile_cont | SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY col) FROM 
VALUES (0), (10) AS tab(col) | struct<percentile_cont(0.25) WITHIN GROUP (ORDER 
BY col):double> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.PercentileDiscBuilder | 
percentile_disc | SELECT percentile_disc(0.25) WITHIN GROUP (ORDER BY col) FROM 
VALUES (0), (10) AS tab(col) | struct<percentile_disc(0.25) WITHIN GROUP (ORDER 
BY col):double> |
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/group-by.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/group-by.sql.out
index 56b2553045f3..147a7e9375a5 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/group-by.sql.out
@@ -1152,31 +1152,6 @@ Aggregate [a#x], [a#x, collect_list(b#x, 0, 0) AS 
collect_list(b)#x, collect_lis
    +- LocalRelation [a#x, b#x]
 
 
--- !query
-SELECT mode(a), mode(b) FROM testData
--- !query analysis
-Aggregate [mode(a#x, 0, 0, false) AS mode(a, false)#x, mode(b#x, 0, 0, false) 
AS mode(b, false)#x]
-+- SubqueryAlias testdata
-   +- View (`testData`, [a#x,b#x])
-      +- Project [cast(a#x as int) AS a#x, cast(b#x as int) AS b#x]
-         +- Project [a#x, b#x]
-            +- SubqueryAlias testData
-               +- LocalRelation [a#x, b#x]
-
-
--- !query
-SELECT a, mode(b) FROM testData GROUP BY a ORDER BY a
--- !query analysis
-Sort [a#x ASC NULLS FIRST], true
-+- Aggregate [a#x], [a#x, mode(b#x, 0, 0, false) AS mode(b, false)#x]
-   +- SubqueryAlias testdata
-      +- View (`testData`, [a#x,b#x])
-         +- Project [cast(a#x as int) AS a#x, cast(b#x as int) AS b#x]
-            +- Project [a#x, b#x]
-               +- SubqueryAlias testData
-                  +- LocalRelation [a#x, b#x]
-
-
 -- !query
 SELECT c * 2 AS d
 FROM (
@@ -1196,119 +1171,3 @@ Aggregate [c#x], [(c#x * 2) AS d#x]
          +- Project [if ((a#x < 0)) 0 else a#x AS b#x]
             +- SubqueryAlias t1
                +- LocalRelation [a#x]
-
-
--- !query
-SELECT mode(col) FROM VALUES (-10), (0), (10) AS tab(col)
--- !query analysis
-Aggregate [mode(col#x, 0, 0, false) AS mode(col, false)#x]
-+- SubqueryAlias tab
-   +- LocalRelation [col#x]
-
-
--- !query
-SELECT mode(col, false) FROM VALUES (-10), (0), (10) AS tab(col)
--- !query analysis
-Aggregate [mode(col#x, 0, 0, false) AS mode(col, false)#x]
-+- SubqueryAlias tab
-   +- LocalRelation [col#x]
-
-
--- !query
-SELECT mode(col, true) FROM VALUES (-10), (0), (10) AS tab(col)
--- !query analysis
-Aggregate [mode(col#x, 0, 0, true) AS mode(col, true)#x]
-+- SubqueryAlias tab
-   +- LocalRelation [col#x]
-
-
--- !query
-SELECT mode(col, 'true') FROM VALUES (-10), (0), (10) AS tab(col)
--- !query analysis
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
-  "sqlState" : "42K09",
-  "messageParameters" : {
-    "inputSql" : "\"true\"",
-    "inputType" : "\"STRING\"",
-    "paramIndex" : "2",
-    "requiredType" : "\"BOOLEAN\"",
-    "sqlExpr" : "\"mode(col, true)\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 8,
-    "stopIndex" : 24,
-    "fragment" : "mode(col, 'true')"
-  } ]
-}
-
-
--- !query
-SELECT mode(col, null) FROM VALUES (-10), (0), (10) AS tab(col)
--- !query analysis
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_NULL",
-  "sqlState" : "42K09",
-  "messageParameters" : {
-    "exprName" : "`deterministic`",
-    "sqlExpr" : "\"mode(col, NULL)\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 8,
-    "stopIndex" : 22,
-    "fragment" : "mode(col, null)"
-  } ]
-}
-
-
--- !query
-SELECT mode(col, b) FROM VALUES (-10, false), (0, false), (10, false) AS 
tab(col, b)
--- !query analysis
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
-  "sqlState" : "42K09",
-  "messageParameters" : {
-    "inputExpr" : "\"b\"",
-    "inputName" : "`deterministic`",
-    "inputType" : "\"BOOLEAN\"",
-    "sqlExpr" : "\"mode(col, b)\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 8,
-    "stopIndex" : 19,
-    "fragment" : "mode(col, b)"
-  } ]
-}
-
-
--- !query
-SELECT mode(col) FROM VALUES (map(1, 'a')) AS tab(col)
--- !query analysis
-Aggregate [mode(col#x, 0, 0, false) AS mode(col, false)#x]
-+- SubqueryAlias tab
-   +- LocalRelation [col#x]
-
-
--- !query
-SELECT mode(col, false) FROM VALUES (map(1, 'a')) AS tab(col)
--- !query analysis
-Aggregate [mode(col#x, 0, 0, false) AS mode(col, false)#x]
-+- SubqueryAlias tab
-   +- LocalRelation [col#x]
-
-
--- !query
-SELECT mode(col, true) FROM VALUES (map(1, 'a')) AS tab(col)
--- !query analysis
-Aggregate [mode(col#x, 0, 0, true) AS mode(col, true)#x]
-+- SubqueryAlias tab
-   +- LocalRelation [col#x]
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out
new file mode 100644
index 000000000000..61b970a628e9
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/mode.sql.out
@@ -0,0 +1,511 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE OR REPLACE TEMPORARY VIEW basic_pays AS SELECT * FROM VALUES
+('Diane Murphy','Accounting',8435),
+('Mary Patterson','Accounting',9998),
+('Jeff Firrelli','Accounting',8992),
+('William Patterson','Accounting',8870),
+('Gerard Bondur','Accounting',11472),
+('Anthony Bow','Accounting',6627),
+('Leslie Jennings','IT',8113),
+('Leslie Thompson','IT',5186),
+('Julie Firrelli','Sales',9181),
+('Steve Patterson','Sales',9441),
+('Foon Yue Tseng','Sales',6660),
+('George Vanauf','Sales',10563),
+('Loui Bondur','SCM',10449),
+('Gerard Hernandez','SCM',6949),
+('Pamela Castillo','SCM',11303),
+('Larry Bott','SCM',11798),
+('Barry Jones','SCM',10586)
+AS basic_pays(employee_name, department, salary)
+-- !query analysis
+CreateViewCommand `basic_pays`, SELECT * FROM VALUES
+('Diane Murphy','Accounting',8435),
+('Mary Patterson','Accounting',9998),
+('Jeff Firrelli','Accounting',8992),
+('William Patterson','Accounting',8870),
+('Gerard Bondur','Accounting',11472),
+('Anthony Bow','Accounting',6627),
+('Leslie Jennings','IT',8113),
+('Leslie Thompson','IT',5186),
+('Julie Firrelli','Sales',9181),
+('Steve Patterson','Sales',9441),
+('Foon Yue Tseng','Sales',6660),
+('George Vanauf','Sales',10563),
+('Loui Bondur','SCM',10449),
+('Gerard Hernandez','SCM',6949),
+('Pamela Castillo','SCM',11303),
+('Larry Bott','SCM',11798),
+('Barry Jones','SCM',10586)
+AS basic_pays(employee_name, department, salary), false, true, LocalTempView, 
true
+   +- Project [employee_name#x, department#x, salary#x]
+      +- SubqueryAlias basic_pays
+         +- LocalRelation [employee_name#x, department#x, salary#x]
+
+
+-- !query
+SELECT mode(department), mode(salary) FROM basic_pays
+-- !query analysis
+Aggregate [mode(department#x, 0, 0, None) AS mode(department)#x, 
mode(salary#x, 0, 0, None) AS mode(salary)#x]
++- SubqueryAlias basic_pays
+   +- View (`basic_pays`, [employee_name#x,department#x,salary#x])
+      +- Project [cast(employee_name#x as string) AS employee_name#x, 
cast(department#x as string) AS department#x, cast(salary#x as int) AS salary#x]
+         +- Project [employee_name#x, department#x, salary#x]
+            +- SubqueryAlias basic_pays
+               +- LocalRelation [employee_name#x, department#x, salary#x]
+
+
+-- !query
+SELECT department, mode(salary) FROM basic_pays GROUP BY department ORDER BY 
department
+-- !query analysis
+Sort [department#x ASC NULLS FIRST], true
++- Aggregate [department#x], [department#x, mode(salary#x, 0, 0, None) AS 
mode(salary)#x]
+   +- SubqueryAlias basic_pays
+      +- View (`basic_pays`, [employee_name#x,department#x,salary#x])
+         +- Project [cast(employee_name#x as string) AS employee_name#x, 
cast(department#x as string) AS department#x, cast(salary#x as int) AS salary#x]
+            +- Project [employee_name#x, department#x, salary#x]
+               +- SubqueryAlias basic_pays
+                  +- LocalRelation [employee_name#x, department#x, salary#x]
+
+
+-- !query
+SELECT department, mode(DISTINCT salary) FROM basic_pays GROUP BY department 
ORDER BY department
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`mode`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 20,
+    "stopIndex" : 40,
+    "fragment" : "mode(DISTINCT salary)"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query analysis
+Aggregate [mode(col#x, 0, 0, None) AS mode(col)#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT mode(col, false) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query analysis
+Aggregate [mode(col#x, 0, 0, None) AS mode(col)#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT mode(col, true) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query analysis
+Aggregate [mode(col#x, 0, 0, Some(true)) AS mode() WITHIN GROUP (ORDER BY col 
DESC)#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT mode(col, 'true') FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "functionName" : "`mode`",
+    "inputSql" : "\"true\"",
+    "inputType" : "\"STRING\"",
+    "paramIndex" : "2",
+    "requiredType" : "\"BOOLEAN\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 24,
+    "fragment" : "mode(col, 'true')"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col, null) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_NULL",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprName" : "`deterministic`",
+    "sqlExpr" : "\"NULL\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 22,
+    "fragment" : "mode(col, null)"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col, b) FROM VALUES (-10, false), (0, false), (10, false) AS 
tab(col, b)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "NON_FOLDABLE_ARGUMENT",
+  "sqlState" : "42K08",
+  "messageParameters" : {
+    "funcName" : "`mode`",
+    "paramName" : "`deterministic`",
+    "paramType" : "\"BOOLEAN\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 19,
+    "fragment" : "mode(col, b)"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query analysis
+Aggregate [mode(col#x, 0, 0, None) AS mode(col)#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT mode(col, false) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query analysis
+Aggregate [mode(col#x, 0, 0, None) AS mode(col)#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT mode(col, true) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query analysis
+Aggregate [mode(col#x, 0, 0, Some(true)) AS mode() WITHIN GROUP (ORDER BY col 
DESC)#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT
+  mode() WITHIN GROUP (ORDER BY col),
+  mode() WITHIN GROUP (ORDER BY col DESC)
+FROM VALUES (null), (null), (null) AS tab(col)
+-- !query analysis
+Aggregate [mode(col#x, 0, 0, Some(true)) AS mode() WITHIN GROUP (ORDER BY col 
DESC)#x, mode(col#x, 0, 0, Some(false)) AS mode() WITHIN GROUP (ORDER BY col)#x]
++- SubqueryAlias tab
+   +- LocalRelation [col#x]
+
+
+-- !query
+SELECT
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary DESC)
+FROM basic_pays
+WHERE salary > 20000
+-- !query analysis
+Aggregate [mode(salary#x, 0, 0, Some(true)) AS mode() WITHIN GROUP (ORDER BY 
salary DESC)#x, mode(salary#x, 0, 0, Some(false)) AS mode() WITHIN GROUP (ORDER 
BY salary)#x]
++- Filter (salary#x > 20000)
+   +- SubqueryAlias basic_pays
+      +- View (`basic_pays`, [employee_name#x,department#x,salary#x])
+         +- Project [cast(employee_name#x as string) AS employee_name#x, 
cast(department#x as string) AS department#x, cast(salary#x as int) AS salary#x]
+            +- Project [employee_name#x, department#x, salary#x]
+               +- SubqueryAlias basic_pays
+                  +- LocalRelation [employee_name#x, department#x, salary#x]
+
+
+-- !query
+SELECT
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary DESC)
+FROM basic_pays
+-- !query analysis
+Aggregate [mode(salary#x, 0, 0, Some(true)) AS mode() WITHIN GROUP (ORDER BY 
salary DESC)#x, mode(salary#x, 0, 0, Some(false)) AS mode() WITHIN GROUP (ORDER 
BY salary)#x]
++- SubqueryAlias basic_pays
+   +- View (`basic_pays`, [employee_name#x,department#x,salary#x])
+      +- Project [cast(employee_name#x as string) AS employee_name#x, 
cast(department#x as string) AS department#x, cast(salary#x as int) AS salary#x]
+         +- Project [employee_name#x, department#x, salary#x]
+            +- SubqueryAlias basic_pays
+               +- LocalRelation [employee_name#x, department#x, salary#x]
+
+
+-- !query
+SELECT
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary) FILTER (WHERE salary > 10000)
+FROM basic_pays
+-- !query analysis
+Aggregate [mode(salary#x, 0, 0, Some(true)) AS mode() WITHIN GROUP (ORDER BY 
salary DESC)#x, mode(salary#x, 0, 0, Some(true)) FILTER (WHERE (salary#x > 
10000)) AS mode() WITHIN GROUP (ORDER BY salary DESC) FILTER (WHERE (salary > 
10000))#x]
++- SubqueryAlias basic_pays
+   +- View (`basic_pays`, [employee_name#x,department#x,salary#x])
+      +- Project [cast(employee_name#x as string) AS employee_name#x, 
cast(department#x as string) AS department#x, cast(salary#x as int) AS salary#x]
+         +- Project [employee_name#x, department#x, salary#x]
+            +- SubqueryAlias basic_pays
+               +- LocalRelation [employee_name#x, department#x, salary#x]
+
+
+-- !query
+SELECT
+  department,
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary) FILTER (WHERE salary > 10000)
+FROM basic_pays
+GROUP BY department
+ORDER BY department
+-- !query analysis
+Sort [department#x ASC NULLS FIRST], true
++- Aggregate [department#x], [department#x, mode(salary#x, 0, 0, Some(true)) 
AS mode() WITHIN GROUP (ORDER BY salary DESC)#x, mode(salary#x, 0, 0, 
Some(true)) FILTER (WHERE (salary#x > 10000)) AS mode() WITHIN GROUP (ORDER BY 
salary DESC) FILTER (WHERE (salary > 10000))#x]
+   +- SubqueryAlias basic_pays
+      +- View (`basic_pays`, [employee_name#x,department#x,salary#x])
+         +- Project [cast(employee_name#x as string) AS employee_name#x, 
cast(department#x as string) AS department#x, cast(salary#x as int) AS salary#x]
+            +- Project [employee_name#x, department#x, salary#x]
+               +- SubqueryAlias basic_pays
+                  +- LocalRelation [employee_name#x, department#x, salary#x]
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department)
+FROM basic_pays
+ORDER BY salary
+-- !query analysis
+Sort [salary#x ASC NULLS FIRST], true
++- Project [employee_name#x, department#x, salary#x, mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING)#x]
+   +- Project [employee_name#x, department#x, salary#x, mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING)#x, mode() WITHIN GROUP (ORDER BY salary 
DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING)#x]
+      +- Window [mode(salary#x, 0, 0, Some(true)) 
windowspecdefinition(department#x, specifiedwindowframe(RowFrame, 
unboundedpreceding$(), unboundedfollowing$())) AS mode() WITHIN GROUP (ORDER BY 
salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING)#x], [department#x]
+         +- Project [employee_name#x, department#x, salary#x]
+            +- SubqueryAlias basic_pays
+               +- View (`basic_pays`, [employee_name#x,department#x,salary#x])
+                  +- Project [cast(employee_name#x as string) AS 
employee_name#x, cast(department#x as string) AS department#x, cast(salary#x as 
int) AS salary#x]
+                     +- Project [employee_name#x, department#x, salary#x]
+                        +- SubqueryAlias basic_pays
+                           +- LocalRelation [employee_name#x, department#x, 
salary#x]
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department ORDER 
BY salary)
+FROM basic_pays
+ORDER BY salary
+-- !query analysis
+Sort [salary#x ASC NULLS FIRST], true
++- Project [employee_name#x, department#x, salary#x, mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ORDER BY salary ASC NULLS 
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x]
+   +- Project [employee_name#x, department#x, salary#x, mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ORDER BY salary ASC NULLS 
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x, mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ORDER BY salary ASC NULLS 
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#x]
+      +- Window [mode(salary#x, 0, 0, Some(true)) 
windowspecdefinition(department#x, salary#x ASC NULLS FIRST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
mode() WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department ORDER 
BY salary ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW)#x], [department#x], [salary#x ASC NULLS FIRST]
+         +- Project [employee_name#x, department#x, salary#x]
+            +- SubqueryAlias basic_pays
+               +- View (`basic_pays`, [employee_name#x,department#x,salary#x])
+                  +- Project [cast(employee_name#x as string) AS 
employee_name#x, cast(department#x as string) AS department#x, cast(salary#x as 
int) AS salary#x]
+                     +- Project [employee_name#x, department#x, salary#x]
+                        +- SubqueryAlias basic_pays
+                           +- LocalRelation [employee_name#x, department#x, 
salary#x]
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department ROWS 
BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING)
+FROM basic_pays
+ORDER BY salary
+-- !query analysis
+Sort [salary#x ASC NULLS FIRST], true
++- Project [employee_name#x, department#x, salary#x, mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND 1 FOLLOWING)#x]
+   +- Project [employee_name#x, department#x, salary#x, mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND 1 FOLLOWING)#x, mode() WITHIN GROUP (ORDER BY salary DESC) OVER 
(PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING)#x]
+      +- Window [mode(salary#x, 0, 0, Some(true)) 
windowspecdefinition(department#x, specifiedwindowframe(RowFrame, 
unboundedpreceding$(), 1)) AS mode() WITHIN GROUP (ORDER BY salary DESC) OVER 
(PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING)#x], 
[department#x]
+         +- Project [employee_name#x, department#x, salary#x]
+            +- SubqueryAlias basic_pays
+               +- View (`basic_pays`, [employee_name#x,department#x,salary#x])
+                  +- Project [cast(employee_name#x as string) AS 
employee_name#x, cast(department#x as string) AS department#x, cast(salary#x as 
int) AS salary#x]
+                     +- Project [employee_name#x, department#x, salary#x]
+                        +- SubqueryAlias basic_pays
+                           +- LocalRelation [employee_name#x, department#x, 
salary#x]
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER w
+FROM basic_pays
+WHERE salary > 8900
+WINDOW w AS (PARTITION BY department)
+ORDER BY salary
+-- !query analysis
+Sort [salary#x ASC NULLS FIRST], true
++- Project [employee_name#x, department#x, salary#x, mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING)#x]
+   +- Project [employee_name#x, department#x, salary#x, mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING)#x, mode() WITHIN GROUP (ORDER BY salary 
DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING)#x]
+      +- Window [mode(salary#x, 0, 0, Some(true)) 
windowspecdefinition(department#x, specifiedwindowframe(RowFrame, 
unboundedpreceding$(), unboundedfollowing$())) AS mode() WITHIN GROUP (ORDER BY 
salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING)#x], [department#x]
+         +- Project [employee_name#x, department#x, salary#x]
+            +- Filter (salary#x > 8900)
+               +- SubqueryAlias basic_pays
+                  +- View (`basic_pays`, 
[employee_name#x,department#x,salary#x])
+                     +- Project [cast(employee_name#x as string) AS 
employee_name#x, cast(department#x as string) AS department#x, cast(salary#x as 
int) AS salary#x]
+                        +- Project [employee_name#x, department#x, salary#x]
+                           +- SubqueryAlias basic_pays
+                              +- LocalRelation [employee_name#x, department#x, 
salary#x]
+
+
+-- !query
+SELECT
+  mode(DISTINCT salary) WITHIN GROUP (ORDER BY salary)
+FROM basic_pays
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`mode`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 61,
+    "fragment" : "mode(DISTINCT salary) WITHIN GROUP (ORDER BY salary)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  mode()
+FROM basic_pays
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`mode`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 15,
+    "fragment" : "mode()"
+  } ]
+}
+
+
+-- !query
+SELECT
+  mode(salary) WITHIN GROUP (ORDER BY salary)
+FROM basic_pays
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "actualNum" : "1",
+    "expectedNum" : "0",
+    "funcName" : "`mode`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 52,
+    "fragment" : "mode(salary) WITHIN GROUP (ORDER BY salary)"
+  } ]
+}
+
+
+-- !query
+CREATE OR REPLACE TEMPORARY VIEW intervals AS SELECT * FROM VALUES
+(0, INTERVAL '0' MONTH, INTERVAL '0' SECOND, INTERVAL '0' MINUTE),
+(0, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(0, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(0, INTERVAL '30' MONTH, INTERVAL '30' SECOND, INTERVAL '30' MINUTE),
+(0, INTERVAL '40' MONTH, INTERVAL '40' SECOND, INTERVAL '40' MINUTE),
+(1, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(1, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(2, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(2, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(2, INTERVAL '25' MONTH, INTERVAL '25' SECOND, INTERVAL '25' MINUTE),
+(2, INTERVAL '30' MONTH, INTERVAL '30' SECOND, INTERVAL '30' MINUTE),
+(3, INTERVAL '60' MONTH, INTERVAL '60' SECOND, INTERVAL '60' MINUTE),
+(4, null, null, null)
+AS intervals(k, dt, ym, dt2)
+-- !query analysis
+CreateViewCommand `intervals`, SELECT * FROM VALUES
+(0, INTERVAL '0' MONTH, INTERVAL '0' SECOND, INTERVAL '0' MINUTE),
+(0, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(0, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(0, INTERVAL '30' MONTH, INTERVAL '30' SECOND, INTERVAL '30' MINUTE),
+(0, INTERVAL '40' MONTH, INTERVAL '40' SECOND, INTERVAL '40' MINUTE),
+(1, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(1, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(2, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(2, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(2, INTERVAL '25' MONTH, INTERVAL '25' SECOND, INTERVAL '25' MINUTE),
+(2, INTERVAL '30' MONTH, INTERVAL '30' SECOND, INTERVAL '30' MINUTE),
+(3, INTERVAL '60' MONTH, INTERVAL '60' SECOND, INTERVAL '60' MINUTE),
+(4, null, null, null)
+AS intervals(k, dt, ym, dt2), false, true, LocalTempView, true
+   +- Project [k#x, dt#x, ym#x, dt2#x]
+      +- SubqueryAlias intervals
+         +- LocalRelation [k#x, dt#x, ym#x, dt2#x]
+
+
+-- !query
+SELECT
+  mode() WITHIN GROUP (ORDER BY dt),
+  mode() WITHIN GROUP (ORDER BY dt DESC)
+FROM intervals
+-- !query analysis
+Aggregate [mode(dt#x, 0, 0, Some(true)) AS mode() WITHIN GROUP (ORDER BY dt 
DESC)#x, mode(dt#x, 0, 0, Some(false)) AS mode() WITHIN GROUP (ORDER BY dt)#x]
++- SubqueryAlias intervals
+   +- View (`intervals`, [k#x,dt#x,ym#x,dt2#x])
+      +- Project [cast(k#x as int) AS k#x, cast(dt#x as interval month) AS 
dt#x, cast(ym#x as interval second) AS ym#x, cast(dt2#x as interval minute) AS 
dt2#x]
+         +- Project [k#x, dt#x, ym#x, dt2#x]
+            +- SubqueryAlias intervals
+               +- LocalRelation [k#x, dt#x, ym#x, dt2#x]
+
+
+-- !query
+SELECT
+  k,
+  mode() WITHIN GROUP (ORDER BY ym),
+  mode() WITHIN GROUP (ORDER BY dt DESC)
+FROM intervals
+GROUP BY k
+ORDER BY k
+-- !query analysis
+Sort [k#x ASC NULLS FIRST], true
++- Aggregate [k#x], [k#x, mode(ym#x, 0, 0, Some(true)) AS mode() WITHIN GROUP 
(ORDER BY ym DESC)#x, mode(dt#x, 0, 0, Some(false)) AS mode() WITHIN GROUP 
(ORDER BY dt)#x]
+   +- SubqueryAlias intervals
+      +- View (`intervals`, [k#x,dt#x,ym#x,dt2#x])
+         +- Project [cast(k#x as int) AS k#x, cast(dt#x as interval month) AS 
dt#x, cast(ym#x as interval second) AS ym#x, cast(dt2#x as interval minute) AS 
dt2#x]
+            +- Project [k#x, dt#x, ym#x, dt2#x]
+               +- SubqueryAlias intervals
+                  +- LocalRelation [k#x, dt#x, ym#x, dt2#x]
diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql 
b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
index 4b76510b65f2..ea1e2f323151 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
@@ -248,11 +248,6 @@ FROM VALUES
   (1,4),(2,3),(1,4),(2,4) AS v(a,b)
 GROUP BY a;
 
-
-SELECT mode(a), mode(b) FROM testData;
-SELECT a, mode(b) FROM testData GROUP BY a ORDER BY a;
-
-
 -- SPARK-44846: PushFoldableIntoBranches in complex grouping expressions cause 
bindReference error
 SELECT c * 2 AS d
 FROM (
@@ -264,14 +259,3 @@ FROM (
          GROUP BY b
      ) t3
 GROUP BY c;
-
--- SPARK-45034: Support deterministic mode function
-SELECT mode(col) FROM VALUES (-10), (0), (10) AS tab(col);
-SELECT mode(col, false) FROM VALUES (-10), (0), (10) AS tab(col);
-SELECT mode(col, true) FROM VALUES (-10), (0), (10) AS tab(col);
-SELECT mode(col, 'true') FROM VALUES (-10), (0), (10) AS tab(col);
-SELECT mode(col, null) FROM VALUES (-10), (0), (10) AS tab(col);
-SELECT mode(col, b) FROM VALUES (-10, false), (0, false), (10, false) AS 
tab(col, b);
-SELECT mode(col) FROM VALUES (map(1, 'a')) AS tab(col);
-SELECT mode(col, false) FROM VALUES (map(1, 'a')) AS tab(col);
-SELECT mode(col, true) FROM VALUES (map(1, 'a')) AS tab(col);
diff --git a/sql/core/src/test/resources/sql-tests/inputs/mode.sql 
b/sql/core/src/test/resources/sql-tests/inputs/mode.sql
new file mode 100644
index 000000000000..85035da7ad3f
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/mode.sql
@@ -0,0 +1,139 @@
+-- Test data.
+CREATE OR REPLACE TEMPORARY VIEW basic_pays AS SELECT * FROM VALUES
+('Diane Murphy','Accounting',8435),
+('Mary Patterson','Accounting',9998),
+('Jeff Firrelli','Accounting',8992),
+('William Patterson','Accounting',8870),
+('Gerard Bondur','Accounting',11472),
+('Anthony Bow','Accounting',6627),
+('Leslie Jennings','IT',8113),
+('Leslie Thompson','IT',5186),
+('Julie Firrelli','Sales',9181),
+('Steve Patterson','Sales',9441),
+('Foon Yue Tseng','Sales',6660),
+('George Vanauf','Sales',10563),
+('Loui Bondur','SCM',10449),
+('Gerard Hernandez','SCM',6949),
+('Pamela Castillo','SCM',11303),
+('Larry Bott','SCM',11798),
+('Barry Jones','SCM',10586)
+AS basic_pays(employee_name, department, salary);
+
+SELECT mode(department), mode(salary) FROM basic_pays;
+SELECT department, mode(salary) FROM basic_pays GROUP BY department ORDER BY 
department;
+SELECT department, mode(DISTINCT salary) FROM basic_pays GROUP BY department 
ORDER BY department;
+
+-- SPARK-45034: Support deterministic mode function
+SELECT mode(col) FROM VALUES (-10), (0), (10) AS tab(col);
+SELECT mode(col, false) FROM VALUES (-10), (0), (10) AS tab(col);
+SELECT mode(col, true) FROM VALUES (-10), (0), (10) AS tab(col);
+SELECT mode(col, 'true') FROM VALUES (-10), (0), (10) AS tab(col);
+SELECT mode(col, null) FROM VALUES (-10), (0), (10) AS tab(col);
+SELECT mode(col, b) FROM VALUES (-10, false), (0, false), (10, false) AS 
tab(col, b);
+SELECT mode(col) FROM VALUES (map(1, 'a')) AS tab(col);
+SELECT mode(col, false) FROM VALUES (map(1, 'a')) AS tab(col);
+SELECT mode(col, true) FROM VALUES (map(1, 'a')) AS tab(col);
+
+SELECT
+  mode() WITHIN GROUP (ORDER BY col),
+  mode() WITHIN GROUP (ORDER BY col DESC)
+FROM VALUES (null), (null), (null) AS tab(col);
+
+SELECT
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary DESC)
+FROM basic_pays
+WHERE salary > 20000;
+
+SELECT
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary DESC)
+FROM basic_pays;
+
+SELECT
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary) FILTER (WHERE salary > 10000)
+FROM basic_pays;
+
+SELECT
+  department,
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary) FILTER (WHERE salary > 10000)
+FROM basic_pays
+GROUP BY department
+ORDER BY department;
+
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department)
+FROM basic_pays
+ORDER BY salary;
+
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department ORDER 
BY salary)
+FROM basic_pays
+ORDER BY salary;
+
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department ROWS 
BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING)
+FROM basic_pays
+ORDER BY salary;
+
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER w
+FROM basic_pays
+WHERE salary > 8900
+WINDOW w AS (PARTITION BY department)
+ORDER BY salary;
+
+SELECT
+  mode(DISTINCT salary) WITHIN GROUP (ORDER BY salary)
+FROM basic_pays;
+
+SELECT
+  mode()
+FROM basic_pays;
+
+SELECT
+  mode(salary) WITHIN GROUP (ORDER BY salary)
+FROM basic_pays;
+
+CREATE OR REPLACE TEMPORARY VIEW intervals AS SELECT * FROM VALUES
+(0, INTERVAL '0' MONTH, INTERVAL '0' SECOND, INTERVAL '0' MINUTE),
+(0, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(0, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(0, INTERVAL '30' MONTH, INTERVAL '30' SECOND, INTERVAL '30' MINUTE),
+(0, INTERVAL '40' MONTH, INTERVAL '40' SECOND, INTERVAL '40' MINUTE),
+(1, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(1, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(2, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(2, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(2, INTERVAL '25' MONTH, INTERVAL '25' SECOND, INTERVAL '25' MINUTE),
+(2, INTERVAL '30' MONTH, INTERVAL '30' SECOND, INTERVAL '30' MINUTE),
+(3, INTERVAL '60' MONTH, INTERVAL '60' SECOND, INTERVAL '60' MINUTE),
+(4, null, null, null)
+AS intervals(k, dt, ym, dt2);
+
+SELECT
+  mode() WITHIN GROUP (ORDER BY dt),
+  mode() WITHIN GROUP (ORDER BY dt DESC)
+FROM intervals;
+
+SELECT
+  k,
+  mode() WITHIN GROUP (ORDER BY ym),
+  mode() WITHIN GROUP (ORDER BY dt DESC)
+FROM intervals
+GROUP BY k
+ORDER BY k;
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out 
b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
index ac92c369de2e..e9addb963153 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
@@ -1086,25 +1086,6 @@ 
struct<a:int,collect_list(b):array<int>,collect_list(b):array<int>>
 2      [3,4]   [3,4]
 
 
--- !query
-SELECT mode(a), mode(b) FROM testData
--- !query schema
-struct<mode(a, false):int,mode(b, false):int>
--- !query output
-3      1
-
-
--- !query
-SELECT a, mode(b) FROM testData GROUP BY a ORDER BY a
--- !query schema
-struct<a:int,mode(b, false):int>
--- !query output
-NULL   1
-1      1
-2      1
-3      1
-
-
 -- !query
 SELECT c * 2 AS d
 FROM (
@@ -1121,131 +1102,3 @@ struct<d:int>
 -- !query output
 0
 2
-
-
--- !query
-SELECT mode(col) FROM VALUES (-10), (0), (10) AS tab(col)
--- !query schema
-struct<mode(col, false):int>
--- !query output
-0
-
-
--- !query
-SELECT mode(col, false) FROM VALUES (-10), (0), (10) AS tab(col)
--- !query schema
-struct<mode(col, false):int>
--- !query output
-0
-
-
--- !query
-SELECT mode(col, true) FROM VALUES (-10), (0), (10) AS tab(col)
--- !query schema
-struct<mode(col, true):int>
--- !query output
--10
-
-
--- !query
-SELECT mode(col, 'true') FROM VALUES (-10), (0), (10) AS tab(col)
--- !query schema
-struct<>
--- !query output
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
-  "sqlState" : "42K09",
-  "messageParameters" : {
-    "inputSql" : "\"true\"",
-    "inputType" : "\"STRING\"",
-    "paramIndex" : "2",
-    "requiredType" : "\"BOOLEAN\"",
-    "sqlExpr" : "\"mode(col, true)\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 8,
-    "stopIndex" : 24,
-    "fragment" : "mode(col, 'true')"
-  } ]
-}
-
-
--- !query
-SELECT mode(col, null) FROM VALUES (-10), (0), (10) AS tab(col)
--- !query schema
-struct<>
--- !query output
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_NULL",
-  "sqlState" : "42K09",
-  "messageParameters" : {
-    "exprName" : "`deterministic`",
-    "sqlExpr" : "\"mode(col, NULL)\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 8,
-    "stopIndex" : 22,
-    "fragment" : "mode(col, null)"
-  } ]
-}
-
-
--- !query
-SELECT mode(col, b) FROM VALUES (-10, false), (0, false), (10, false) AS 
tab(col, b)
--- !query schema
-struct<>
--- !query output
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
-  "sqlState" : "42K09",
-  "messageParameters" : {
-    "inputExpr" : "\"b\"",
-    "inputName" : "`deterministic`",
-    "inputType" : "\"BOOLEAN\"",
-    "sqlExpr" : "\"mode(col, b)\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 8,
-    "stopIndex" : 19,
-    "fragment" : "mode(col, b)"
-  } ]
-}
-
-
--- !query
-SELECT mode(col) FROM VALUES (map(1, 'a')) AS tab(col)
--- !query schema
-struct<mode(col, false):map<int,string>>
--- !query output
-{1:"a"}
-
-
--- !query
-SELECT mode(col, false) FROM VALUES (map(1, 'a')) AS tab(col)
--- !query schema
-struct<mode(col, false):map<int,string>>
--- !query output
-{1:"a"}
-
-
--- !query
-SELECT mode(col, true) FROM VALUES (map(1, 'a')) AS tab(col)
--- !query schema
-struct<>
--- !query output
-org.apache.spark.SparkIllegalArgumentException
-{
-  "errorClass" : "_LEGACY_ERROR_TEMP_2005",
-  "messageParameters" : {
-    "dataType" : "PhysicalMapType"
-  }
-}
diff --git a/sql/core/src/test/resources/sql-tests/results/mode.sql.out 
b/sql/core/src/test/resources/sql-tests/results/mode.sql.out
new file mode 100644
index 000000000000..763e8d8c1909
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/mode.sql.out
@@ -0,0 +1,495 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE OR REPLACE TEMPORARY VIEW basic_pays AS SELECT * FROM VALUES
+('Diane Murphy','Accounting',8435),
+('Mary Patterson','Accounting',9998),
+('Jeff Firrelli','Accounting',8992),
+('William Patterson','Accounting',8870),
+('Gerard Bondur','Accounting',11472),
+('Anthony Bow','Accounting',6627),
+('Leslie Jennings','IT',8113),
+('Leslie Thompson','IT',5186),
+('Julie Firrelli','Sales',9181),
+('Steve Patterson','Sales',9441),
+('Foon Yue Tseng','Sales',6660),
+('George Vanauf','Sales',10563),
+('Loui Bondur','SCM',10449),
+('Gerard Hernandez','SCM',6949),
+('Pamela Castillo','SCM',11303),
+('Larry Bott','SCM',11798),
+('Barry Jones','SCM',10586)
+AS basic_pays(employee_name, department, salary)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT mode(department), mode(salary) FROM basic_pays
+-- !query schema
+struct<mode(department):string,mode(salary):int>
+-- !query output
+Accounting     10563
+
+
+-- !query
+SELECT department, mode(salary) FROM basic_pays GROUP BY department ORDER BY 
department
+-- !query schema
+struct<department:string,mode(salary):int>
+-- !query output
+Accounting     6627
+IT     5186
+SCM    11303
+Sales  10563
+
+
+-- !query
+SELECT department, mode(DISTINCT salary) FROM basic_pays GROUP BY department 
ORDER BY department
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`mode`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 20,
+    "stopIndex" : 40,
+    "fragment" : "mode(DISTINCT salary)"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<mode(col):int>
+-- !query output
+0
+
+
+-- !query
+SELECT mode(col, false) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<mode(col):int>
+-- !query output
+0
+
+
+-- !query
+SELECT mode(col, true) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<mode() WITHIN GROUP (ORDER BY col DESC):int>
+-- !query output
+-10
+
+
+-- !query
+SELECT mode(col, 'true') FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "functionName" : "`mode`",
+    "inputSql" : "\"true\"",
+    "inputType" : "\"STRING\"",
+    "paramIndex" : "2",
+    "requiredType" : "\"BOOLEAN\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 24,
+    "fragment" : "mode(col, 'true')"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col, null) FROM VALUES (-10), (0), (10) AS tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_NULL",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprName" : "`deterministic`",
+    "sqlExpr" : "\"NULL\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 22,
+    "fragment" : "mode(col, null)"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col, b) FROM VALUES (-10, false), (0, false), (10, false) AS 
tab(col, b)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "NON_FOLDABLE_ARGUMENT",
+  "sqlState" : "42K08",
+  "messageParameters" : {
+    "funcName" : "`mode`",
+    "paramName" : "`deterministic`",
+    "paramType" : "\"BOOLEAN\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 19,
+    "fragment" : "mode(col, b)"
+  } ]
+}
+
+
+-- !query
+SELECT mode(col) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query schema
+struct<mode(col):map<int,string>>
+-- !query output
+{1:"a"}
+
+
+-- !query
+SELECT mode(col, false) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query schema
+struct<mode(col):map<int,string>>
+-- !query output
+{1:"a"}
+
+
+-- !query
+SELECT mode(col, true) FROM VALUES (map(1, 'a')) AS tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_2005",
+  "messageParameters" : {
+    "dataType" : "PhysicalMapType"
+  }
+}
+
+
+-- !query
+SELECT
+  mode() WITHIN GROUP (ORDER BY col),
+  mode() WITHIN GROUP (ORDER BY col DESC)
+FROM VALUES (null), (null), (null) AS tab(col)
+-- !query schema
+struct<mode() WITHIN GROUP (ORDER BY col DESC):void,mode() WITHIN GROUP (ORDER 
BY col):void>
+-- !query output
+NULL   NULL
+
+
+-- !query
+SELECT
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary DESC)
+FROM basic_pays
+WHERE salary > 20000
+-- !query schema
+struct<mode() WITHIN GROUP (ORDER BY salary DESC):int,mode() WITHIN GROUP 
(ORDER BY salary):int>
+-- !query output
+NULL   NULL
+
+
+-- !query
+SELECT
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary DESC)
+FROM basic_pays
+-- !query schema
+struct<mode() WITHIN GROUP (ORDER BY salary DESC):int,mode() WITHIN GROUP 
(ORDER BY salary):int>
+-- !query output
+5186   11798
+
+
+-- !query
+SELECT
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary) FILTER (WHERE salary > 10000)
+FROM basic_pays
+-- !query schema
+struct<mode() WITHIN GROUP (ORDER BY salary DESC):int,mode() WITHIN GROUP 
(ORDER BY salary DESC) FILTER (WHERE (salary > 10000)):int>
+-- !query output
+5186   10449
+
+
+-- !query
+SELECT
+  department,
+  mode() WITHIN GROUP (ORDER BY salary),
+  mode() WITHIN GROUP (ORDER BY salary) FILTER (WHERE salary > 10000)
+FROM basic_pays
+GROUP BY department
+ORDER BY department
+-- !query schema
+struct<department:string,mode() WITHIN GROUP (ORDER BY salary DESC):int,mode() 
WITHIN GROUP (ORDER BY salary DESC) FILTER (WHERE (salary > 10000)):int>
+-- !query output
+Accounting     6627    11472
+IT     5186    NULL
+SCM    6949    10449
+Sales  6660    10563
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department)
+FROM basic_pays
+ORDER BY salary
+-- !query schema
+struct<employee_name:string,department:string,salary:int,mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING):int>
+-- !query output
+Leslie Thompson        IT      5186    5186
+Anthony Bow    Accounting      6627    6627
+Foon Yue Tseng Sales   6660    6660
+Gerard Hernandez       SCM     6949    6949
+Leslie Jennings        IT      8113    5186
+Diane Murphy   Accounting      8435    6627
+William Patterson      Accounting      8870    6627
+Jeff Firrelli  Accounting      8992    6627
+Julie Firrelli Sales   9181    6660
+Steve Patterson        Sales   9441    6660
+Mary Patterson Accounting      9998    6627
+Loui Bondur    SCM     10449   6949
+George Vanauf  Sales   10563   6660
+Barry Jones    SCM     10586   6949
+Pamela Castillo        SCM     11303   6949
+Gerard Bondur  Accounting      11472   6627
+Larry Bott     SCM     11798   6949
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department ORDER 
BY salary)
+FROM basic_pays
+ORDER BY salary
+-- !query schema
+struct<employee_name:string,department:string,salary:int,mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ORDER BY salary ASC NULLS 
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int>
+-- !query output
+Leslie Thompson        IT      5186    5186
+Anthony Bow    Accounting      6627    6627
+Foon Yue Tseng Sales   6660    6660
+Gerard Hernandez       SCM     6949    6949
+Leslie Jennings        IT      8113    5186
+Diane Murphy   Accounting      8435    6627
+William Patterson      Accounting      8870    6627
+Jeff Firrelli  Accounting      8992    6627
+Julie Firrelli Sales   9181    6660
+Steve Patterson        Sales   9441    6660
+Mary Patterson Accounting      9998    6627
+Loui Bondur    SCM     10449   6949
+George Vanauf  Sales   10563   6660
+Barry Jones    SCM     10586   6949
+Pamela Castillo        SCM     11303   6949
+Gerard Bondur  Accounting      11472   6627
+Larry Bott     SCM     11798   6949
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department ROWS 
BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING)
+FROM basic_pays
+ORDER BY salary
+-- !query schema
+struct<employee_name:string,department:string,salary:int,mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND 1 FOLLOWING):int>
+-- !query output
+Leslie Thompson        IT      5186    5186
+Anthony Bow    Accounting      6627    6627
+Foon Yue Tseng Sales   6660    6660
+Gerard Hernandez       SCM     6949    6949
+Leslie Jennings        IT      8113    5186
+Diane Murphy   Accounting      8435    8435
+William Patterson      Accounting      8870    8435
+Jeff Firrelli  Accounting      8992    8435
+Julie Firrelli Sales   9181    9181
+Steve Patterson        Sales   9441    6660
+Mary Patterson Accounting      9998    8435
+Loui Bondur    SCM     10449   6949
+George Vanauf  Sales   10563   6660
+Barry Jones    SCM     10586   6949
+Pamela Castillo        SCM     11303   6949
+Gerard Bondur  Accounting      11472   6627
+Larry Bott     SCM     11798   6949
+
+
+-- !query
+SELECT
+    employee_name,
+    department,
+    salary,
+    mode() WITHIN GROUP (ORDER BY salary) OVER w
+FROM basic_pays
+WHERE salary > 8900
+WINDOW w AS (PARTITION BY department)
+ORDER BY salary
+-- !query schema
+struct<employee_name:string,department:string,salary:int,mode() WITHIN GROUP 
(ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING):int>
+-- !query output
+Jeff Firrelli  Accounting      8992    8992
+Julie Firrelli Sales   9181    9181
+Steve Patterson        Sales   9441    9181
+Mary Patterson Accounting      9998    8992
+Loui Bondur    SCM     10449   10449
+George Vanauf  Sales   10563   9181
+Barry Jones    SCM     10586   10449
+Pamela Castillo        SCM     11303   10449
+Gerard Bondur  Accounting      11472   8992
+Larry Bott     SCM     11798   10449
+
+
+-- !query
+SELECT
+  mode(DISTINCT salary) WITHIN GROUP (ORDER BY salary)
+FROM basic_pays
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.DISTINCT_UNSUPPORTED",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`mode`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 61,
+    "fragment" : "mode(DISTINCT salary) WITHIN GROUP (ORDER BY salary)"
+  } ]
+}
+
+
+-- !query
+SELECT
+  mode()
+FROM basic_pays
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WITHIN_GROUP_MISSING",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "funcName" : "`mode`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 15,
+    "fragment" : "mode()"
+  } ]
+}
+
+
+-- !query
+SELECT
+  mode(salary) WITHIN GROUP (ORDER BY salary)
+FROM basic_pays
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_INVERSE_DISTRIBUTION_FUNCTION.WRONG_NUM_ORDERINGS",
+  "sqlState" : "ID001",
+  "messageParameters" : {
+    "actualNum" : "1",
+    "expectedNum" : "0",
+    "funcName" : "`mode`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 10,
+    "stopIndex" : 52,
+    "fragment" : "mode(salary) WITHIN GROUP (ORDER BY salary)"
+  } ]
+}
+
+
+-- !query
+CREATE OR REPLACE TEMPORARY VIEW intervals AS SELECT * FROM VALUES
+(0, INTERVAL '0' MONTH, INTERVAL '0' SECOND, INTERVAL '0' MINUTE),
+(0, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(0, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(0, INTERVAL '30' MONTH, INTERVAL '30' SECOND, INTERVAL '30' MINUTE),
+(0, INTERVAL '40' MONTH, INTERVAL '40' SECOND, INTERVAL '40' MINUTE),
+(1, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(1, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(2, INTERVAL '10' MONTH, INTERVAL '10' SECOND, INTERVAL '10' MINUTE),
+(2, INTERVAL '20' MONTH, INTERVAL '20' SECOND, INTERVAL '20' MINUTE),
+(2, INTERVAL '25' MONTH, INTERVAL '25' SECOND, INTERVAL '25' MINUTE),
+(2, INTERVAL '30' MONTH, INTERVAL '30' SECOND, INTERVAL '30' MINUTE),
+(3, INTERVAL '60' MONTH, INTERVAL '60' SECOND, INTERVAL '60' MINUTE),
+(4, null, null, null)
+AS intervals(k, dt, ym, dt2)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT
+  mode() WITHIN GROUP (ORDER BY dt),
+  mode() WITHIN GROUP (ORDER BY dt DESC)
+FROM intervals
+-- !query schema
+struct<mode() WITHIN GROUP (ORDER BY dt DESC):interval month,mode() WITHIN 
GROUP (ORDER BY dt):interval month>
+-- !query output
+0-10   1-8
+
+
+-- !query
+SELECT
+  k,
+  mode() WITHIN GROUP (ORDER BY ym),
+  mode() WITHIN GROUP (ORDER BY dt DESC)
+FROM intervals
+GROUP BY k
+ORDER BY k
+-- !query schema
+struct<k:int,mode() WITHIN GROUP (ORDER BY ym DESC):interval second,mode() 
WITHIN GROUP (ORDER BY dt):interval month>
+-- !query output
+0      0 00:00:00.000000000    3-4
+1      0 00:00:10.000000000    1-8
+2      0 00:00:10.000000000    2-6
+3      0 00:01:00.000000000    5-0
+4      NULL    NULL


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to