This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 639301a933f [SPARK-37691][SQL] Support ANSI Aggregation Function: `percentile_disc` 639301a933f is described below commit 639301a933f3f7b0a4cc2c1defb6c843afae180e Author: Jiaan Geng <belie...@163.com> AuthorDate: Tue Apr 19 20:56:56 2022 +0800 [SPARK-37691][SQL] Support ANSI Aggregation Function: `percentile_disc` ### What changes were proposed in this pull request? `PERCENTILE_DISC` is an ANSI aggregate functions. The mainstream database supports `percentile_disc` show below: **Postgresql** https://www.postgresql.org/docs/9.4/functions-aggregate.html **Teradata** https://docs.teradata.com/r/kmuOwjp1zEYg98JsB8fu_A/cPkFySIBORL~M938Zv07Cg **Snowflake** https://docs.snowflake.com/en/sql-reference/functions/percentile_disc.html **Oracle** https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/PERCENTILE_DISC.html#GUID-7C34FDDA-C241-474F-8C5C-50CC0182E005 **DB2** https://www.ibm.com/docs/en/db2/11.5?topic=functions-percentile-disc **H2** http://www.h2database.com/html/functions-aggregate.html#percentile_disc **Sybase** https://infocenter.sybase.com/help/index.jsp?topic=/com.sybase.infocenter.dc01776.1601/doc/html/san1278453110413.html **Exasol** https://docs.exasol.com/sql_references/functions/alphabeticallistfunctions/percentile_disc.htm **RedShift** https://docs.aws.amazon.com/redshift/latest/dg/r_APPROXIMATE_PERCENTILE_DISC.html **Yellowbrick** https://www.yellowbrick.com/docs/2.2/ybd_sqlref/percentile_disc.html **Mariadb** https://mariadb.com/kb/en/percentile_disc/ **Phoenix** http://phoenix.incubator.apache.org/language/functions.html#percentile_disc **Singlestore** https://docs.singlestore.com/db/v7.6/en/reference/sql-reference/window-functions/percentile_disc.html This PR references the implementation of H2. Please refer: https://github.com/h2database/h2database/blob/master/h2/src/main/org/h2/expression/aggregate/Percentile.java ### Why are the changes needed? `PERCENTILE_DISC` is very useful. Exposing the expression can make the migration from other systems to Spark SQL easier. ### Does this PR introduce _any_ user-facing change? 'Yes'. New feature. ### How was this patch tested? New tests. Closes #35041 from beliefer/SPARK-37691. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- docs/sql-ref-ansi-compliance.md | 1 + .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 1 + .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 3 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 7 +- .../expressions/aggregate/PercentileCont.scala | 41 ---- .../{Percentile.scala => percentiles.scala} | 260 ++++++++++++++------- .../spark/sql/catalyst/parser/AstBuilder.scala | 18 +- .../expressions/aggregate/PercentileSuite.scala | 13 +- .../sql/catalyst/parser/PlanParserSuite.scala | 24 +- .../test/resources/sql-tests/inputs/group-by.sql | 17 +- .../inputs/postgreSQL/aggregates_part4.sql | 8 +- .../inputs/udf/postgreSQL/udf-aggregates_part4.sql | 8 +- .../src/test/resources/sql-tests/inputs/window.sql | 59 ++++- .../resources/sql-tests/results/group-by.sql.out | 39 +++- .../results/postgreSQL/aggregates_part4.sql.out | 31 ++- .../udf/postgreSQL/udf-aggregates_part4.sql.out | 31 ++- .../resources/sql-tests/results/window.sql.out | 184 +++++++++++---- 17 files changed, 540 insertions(+), 205 deletions(-) diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 66161a112b1..89ba2d17608 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -512,6 +512,7 @@ Below is a list of all the keywords in Spark SQL. |PARTITIONS|non-reserved|non-reserved|non-reserved| |PERCENT|non-reserved|non-reserved|non-reserved| |PERCENTILE_CONT|reserved|non-reserved|non-reserved| +|PERCENTILE_DISC|reserved|non-reserved|non-reserved| |PIVOT|non-reserved|non-reserved|non-reserved| |PLACING|non-reserved|non-reserved|non-reserved| |POSITION|non-reserved|non-reserved|reserved| diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index e84d4fa45eb..c5199a601ce 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -259,6 +259,7 @@ PARTITION: 'PARTITION'; PARTITIONED: 'PARTITIONED'; PARTITIONS: 'PARTITIONS'; PERCENTILE_CONT: 'PERCENTILE_CONT'; +PERCENTILE_DISC: 'PERCENTILE_DISC'; PERCENTLIT: 'PERCENT'; PIVOT: 'PIVOT'; PLACING: 'PLACING'; diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 872ea534ec3..6d465469f41 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -845,7 +845,7 @@ primaryExpression FROM srcStr=valueExpression RIGHT_PAREN #trim | OVERLAY LEFT_PAREN input=valueExpression PLACING replace=valueExpression FROM position=valueExpression (FOR length=valueExpression)? RIGHT_PAREN #overlay - | PERCENTILE_CONT LEFT_PAREN percentage=valueExpression RIGHT_PAREN + | name=(PERCENTILE_CONT | PERCENTILE_DISC) LEFT_PAREN percentage=valueExpression RIGHT_PAREN WITHIN GROUP LEFT_PAREN ORDER BY sortItem RIGHT_PAREN ( OVER windowSpec)? #percentile ; @@ -1466,6 +1466,7 @@ nonReserved | PARTITIONED | PARTITIONS | PERCENTILE_CONT + | PERCENTILE_DISC | PERCENTLIT | PIVOT | PLACING diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 936dadc78c1..e9cf758ede2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, PercentileCont} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, PercentileCont, PercentileDisc} import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, DecorrelateInnerQuery, InlineCTE} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -228,10 +228,11 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { // Only allow window functions with an aggregate expression or an offset window // function or a Pandas window UDF. w.windowFunction match { - case AggregateExpression(_: PercentileCont, _, _, _, _) + case agg @ AggregateExpression(_: PercentileCont | _: PercentileDisc, _, _, _, _) if w.windowSpec.orderSpec.nonEmpty || w.windowSpec.frameSpecification != SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) => - failAnalysis("Cannot specify order by or frame for 'PERCENTILE_CONT'.") + failAnalysis( + s"Cannot specify order by or frame for '${agg.aggregateFunction.prettyName}'.") case _: AggregateExpression | _: FrameLessOffsetWindowFunction | _: AggregateWindowFunction => // OK case f: PythonUDF if PythonUDF.isWindowPandasUDF(f) => // OK diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileCont.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileCont.scala deleted file mode 100644 index facb269ffe7..00000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileCont.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions.aggregate - -import org.apache.spark.sql.catalyst.expressions.{Expression, ImplicitCastInputTypes, RuntimeReplaceableAggregate} -import org.apache.spark.sql.catalyst.trees.BinaryLike -import org.apache.spark.sql.types.AbstractDataType - -/** - * Return a percentile value based on a continuous distribution of - * numeric or ansi interval column at the given percentage (specified in ORDER BY clause). - * The value of percentage must be between 0.0 and 1.0. - */ -case class PercentileCont(left: Expression, right: Expression) - extends AggregateFunction - with RuntimeReplaceableAggregate - with ImplicitCastInputTypes - with BinaryLike[Expression] { - private lazy val percentile = new Percentile(left, right) - override def replacement: Expression = percentile - override def nodeName: String = "percentile_cont" - override def inputTypes: Seq[AbstractDataType] = percentile.inputTypes - override protected def withNewChildrenInternal( - newLeft: Expression, newRight: Expression): PercentileCont = - this.copy(left = newLeft, right = newRight) -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala similarity index 70% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala index 91bd7aa08fc..d55f0b7a831 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala @@ -24,93 +24,36 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.trees.TernaryLike +import org.apache.spark.sql.catalyst.trees.{BinaryLike, TernaryLike} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.util.collection.OpenHashMap -/** - * The Percentile aggregate function returns the exact percentile(s) of numeric column `expr` at - * the given percentage(s) with value range in [0.0, 1.0]. - * - * Because the number of elements and their partial order cannot be determined in advance. - * Therefore we have to store all the elements in memory, and so notice that too many elements can - * cause GC paused and eventually OutOfMemory Errors. - * - * @param child child expression that produce numeric column value with `child.eval(inputRow)` - * @param percentageExpression Expression that represents a single percentage value or an array of - * percentage values. Each percentage value must be in the range - * [0.0, 1.0]. - */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = - """ - _FUNC_(col, percentage [, frequency]) - Returns the exact percentile value of numeric - or ansi interval column `col` at the given percentage. The value of percentage must be - between 0.0 and 1.0. The value of frequency should be positive integral +abstract class PercentileBase extends TypedImperativeAggregate[OpenHashMap[AnyRef, Long]] + with ImplicitCastInputTypes { - _FUNC_(col, array(percentage1 [, percentage2]...) [, frequency]) - Returns the exact - percentile value array of numeric column `col` at the given percentage(s). Each value - of the percentage array must be between 0.0 and 1.0. The value of frequency should be - positive integral + val child: Expression + val percentageExpression: Expression + val frequencyExpression : Expression - """, - examples = """ - Examples: - > SELECT _FUNC_(col, 0.3) FROM VALUES (0), (10) AS tab(col); - 3.0 - > SELECT _FUNC_(col, array(0.25, 0.75)) FROM VALUES (0), (10) AS tab(col); - [2.5,7.5] - > SELECT _FUNC_(col, 0.5) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' MONTH) AS tab(col); - 5.0 - > SELECT _FUNC_(col, array(0.2, 0.5)) FROM VALUES (INTERVAL '0' SECOND), (INTERVAL '10' SECOND) AS tab(col); - [2000000.0,5000000.0] - """, - group = "agg_funcs", - since = "2.1.0") -// scalastyle:on line.size.limit -case class Percentile( - child: Expression, - percentageExpression: Expression, - frequencyExpression : Expression, - mutableAggBufferOffset: Int = 0, - inputAggBufferOffset: Int = 0) - extends TypedImperativeAggregate[OpenHashMap[AnyRef, Long]] with ImplicitCastInputTypes - with TernaryLike[Expression] { + // Whether to reverse calculate percentile value + val reverse: Boolean - def this(child: Expression, percentageExpression: Expression) = { - this(child, percentageExpression, Literal(1L), 0, 0) - } - - def this(child: Expression, percentageExpression: Expression, frequency: Expression) = { - this(child, percentageExpression, frequency, 0, 0) - } - - override def prettyName: String = "percentile" - - override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Percentile = - copy(mutableAggBufferOffset = newMutableAggBufferOffset) - - override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): Percentile = - copy(inputAggBufferOffset = newInputAggBufferOffset) + // Whether the value is discrete + protected def discrete: Boolean // Mark as lazy so that percentageExpression is not evaluated during tree transformation. @transient private lazy val returnPercentileArray = percentageExpression.dataType.isInstanceOf[ArrayType] @transient - private lazy val percentages = percentageExpression.eval() match { + protected lazy val percentages = percentageExpression.eval() match { case null => null case num: Double => Array(num) case arrayData: ArrayData => arrayData.toDoubleArray() } - override def first: Expression = child - override def second: Expression = percentageExpression - override def third: Expression = frequencyExpression - // Returns null for empty inputs override def nullable: Boolean = true @@ -151,7 +94,7 @@ case class Percentile( } } - private def toDoubleValue(d: Any): Double = d match { + protected def toDoubleValue(d: Any): Double = d match { case d: Decimal => d.toDouble case n: Number => n.doubleValue } @@ -204,7 +147,11 @@ case class Percentile( case intervalType: DayTimeIntervalType => intervalType.ordering case otherType => QueryExecutionErrors.unsupportedTypeError(otherType) } - val sortedCounts = buffer.toSeq.sortBy(_._1)(ordering.asInstanceOf[Ordering[AnyRef]]) + val sortedCounts = if (reverse) { + buffer.toSeq.sortBy(_._1)(ordering.asInstanceOf[Ordering[AnyRef]].reverse) + } else { + buffer.toSeq.sortBy(_._1)(ordering.asInstanceOf[Ordering[AnyRef]]) + } val accumulatedCounts = sortedCounts.scanLeft((sortedCounts.head._1, 0L)) { case ((key1, count1), (key2, count2)) => (key2, count1 + count2) }.tail @@ -227,40 +174,44 @@ case class Percentile( /** * Get the percentile value. - * * This function has been based upon similar function from HIVE * `org.apache.hadoop.hive.ql.udf.UDAFPercentile.getPercentile()`. */ - private def getPercentile(aggreCounts: Seq[(AnyRef, Long)], position: Double): Double = { + private def getPercentile( + accumulatedCounts: Seq[(AnyRef, Long)], position: Double): Double = { // We may need to do linear interpolation to get the exact percentile val lower = position.floor.toLong val higher = position.ceil.toLong // Use binary search to find the lower and the higher position. - val countsArray = aggreCounts.map(_._2).toArray[Long] - val lowerIndex = binarySearchCount(countsArray, 0, aggreCounts.size, lower + 1) - val higherIndex = binarySearchCount(countsArray, 0, aggreCounts.size, higher + 1) + val countsArray = accumulatedCounts.map(_._2).toArray[Long] + val lowerIndex = binarySearchCount(countsArray, 0, accumulatedCounts.size, lower + 1) + val higherIndex = binarySearchCount(countsArray, 0, accumulatedCounts.size, higher + 1) - val lowerKey = aggreCounts(lowerIndex)._1 + val lowerKey = accumulatedCounts(lowerIndex)._1 if (higher == lower) { // no interpolation needed because position does not have a fraction return toDoubleValue(lowerKey) } - val higherKey = aggreCounts(higherIndex)._1 + val higherKey = accumulatedCounts(higherIndex)._1 if (higherKey == lowerKey) { // no interpolation needed because lower position and higher position has the same key return toDoubleValue(lowerKey) } - // Linear interpolation to get the exact percentile - (higher - position) * toDoubleValue(lowerKey) + (position - lower) * toDoubleValue(higherKey) + if (discrete) { + toDoubleValue(lowerKey) + } else { + // Linear interpolation to get the exact percentile + (higher - position) * toDoubleValue(lowerKey) + (position - lower) * toDoubleValue(higherKey) + } } /** * use a binary search to find the index of the position closest to the current value. */ - private def binarySearchCount( + protected def binarySearchCount( countsArray: Array[Long], start: Int, end: Int, value: Long): Int = { util.Arrays.binarySearch(countsArray, 0, end, value) match { case ix if ix < 0 => -(ix + 1) @@ -317,6 +268,88 @@ case class Percentile( bis.close() } } +} + +/** + * The Percentile aggregate function returns the exact percentile(s) of numeric column `expr` at + * the given percentage(s) with value range in [0.0, 1.0]. + * + * Because the number of elements and their partial order cannot be determined in advance. + * Therefore we have to store all the elements in memory, and so notice that too many elements can + * cause GC paused and eventually OutOfMemory Errors. + * + * @param child child expression that produce numeric column value with `child.eval(inputRow)` + * @param percentageExpression Expression that represents a single percentage value or an array of + * percentage values. Each percentage value must be in the range + * [0.0, 1.0]. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = + """ + _FUNC_(col, percentage [, frequency]) - Returns the exact percentile value of numeric + or ansi interval column `col` at the given percentage. The value of percentage must be + between 0.0 and 1.0. The value of frequency should be positive integral + + _FUNC_(col, array(percentage1 [, percentage2]...) [, frequency]) - Returns the exact + percentile value array of numeric column `col` at the given percentage(s). Each value + of the percentage array must be between 0.0 and 1.0. The value of frequency should be + positive integral + + """, + examples = """ + Examples: + > SELECT _FUNC_(col, 0.3) FROM VALUES (0), (10) AS tab(col); + 3.0 + > SELECT _FUNC_(col, array(0.25, 0.75)) FROM VALUES (0), (10) AS tab(col); + [2.5,7.5] + > SELECT _FUNC_(col, 0.5) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' MONTH) AS tab(col); + 5.0 + > SELECT _FUNC_(col, array(0.2, 0.5)) FROM VALUES (INTERVAL '0' SECOND), (INTERVAL '10' SECOND) AS tab(col); + [2000000.0,5000000.0] + """, + group = "agg_funcs", + since = "2.1.0") +// scalastyle:on line.size.limit +case class Percentile( + child: Expression, + percentageExpression: Expression, + frequencyExpression : Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0, + reverse: Boolean = false) extends PercentileBase with TernaryLike[Expression] { + + def this(child: Expression, percentageExpression: Expression) = { + this(child, percentageExpression, Literal(1L), 0, 0) + } + + def this(child: Expression, percentageExpression: Expression, frequency: Expression) = { + this(child, percentageExpression, frequency, 0, 0) + } + + def this(child: Expression, percentageExpression: Expression, reverse: Boolean) = { + this(child, percentageExpression, Literal(1L), reverse = reverse) + } + + override def first: Expression = child + override def second: Expression = percentageExpression + override def third: Expression = frequencyExpression + + override def prettyName: String = "percentile" + + override def discrete: Boolean = false + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Percentile = + copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): Percentile = + copy(inputAggBufferOffset = newInputAggBufferOffset) + + override protected def stringArgs: Iterator[Any] = if (discrete) { + super.stringArgs ++ Some(discrete) + } else { + super.stringArgs + } override protected def withNewChildrenInternal( newFirst: Expression, newSecond: Expression, newThird: Expression): Percentile = copy( @@ -325,3 +358,70 @@ case class Percentile( frequencyExpression = newThird ) } + +/** + * Return a percentile value based on a continuous distribution of + * numeric or ansi interval column at the given percentage (specified in ORDER BY clause). + * The value of percentage must be between 0.0 and 1.0. + */ +case class PercentileCont(left: Expression, right: Expression, reverse: Boolean = false) + extends AggregateFunction + with RuntimeReplaceableAggregate + with ImplicitCastInputTypes + with BinaryLike[Expression] { + private lazy val percentile = new Percentile(left, right, reverse) + override def replacement: Expression = percentile + override def nodeName: String = "percentile_cont" + override def inputTypes: Seq[AbstractDataType] = percentile.inputTypes + override def sql(isDistinct: Boolean): String = { + val distinct = if (isDistinct) "DISTINCT " else "" + val direction = if (reverse) " DESC" else "" + s"$prettyName($distinct${right.sql}) WITHIN GROUP (ORDER BY v$direction)" + } + override protected def withNewChildrenInternal( + newLeft: Expression, newRight: Expression): PercentileCont = + this.copy(left = newLeft, right = newRight) +} + +/** + * The Percentile aggregate function returns the percentile(s) based on a discrete distribution of + * numeric column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * Because the number of elements and their partial order cannot be determined in advance. + * Therefore we have to store all the elements in memory, and so notice that too many elements can + * cause GC paused and eventually OutOfMemory Errors. + */ +case class PercentileDisc( + child: Expression, + percentageExpression: Expression, + reverse: Boolean = false, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) extends PercentileBase with BinaryLike[Expression] { + + val frequencyExpression: Expression = Literal(1L) + + override def left: Expression = child + override def right: Expression = percentageExpression + + override def prettyName: String = "percentile_disc" + + override def discrete: Boolean = true + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): PercentileDisc = + copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): PercentileDisc = + copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def sql(isDistinct: Boolean): String = { + val distinct = if (isDistinct) "DISTINCT " else "" + val direction = if (reverse) " DESC" else "" + s"$prettyName($distinct${right.sql}) WITHIN GROUP (ORDER BY v$direction)" + } + + override protected def withNewChildrenInternal( + newLeft: Expression, newRight: Expression): PercentileDisc = copy( + child = newLeft, + percentageExpression = newRight + ) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 83ac8054c3d..4ffb4f1cfe1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableId import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last, PercentileCont} +import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last, PercentileCont, PercentileDisc} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -1841,11 +1841,19 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit override def visitPercentile(ctx: PercentileContext): Expression = withOrigin(ctx) { val percentage = expression(ctx.percentage) val sortOrder = visitSortItem(ctx.sortItem) - val percentileCont = sortOrder.direction match { - case Ascending => PercentileCont(sortOrder.child, percentage) - case Descending => PercentileCont(sortOrder.child, Subtract(Literal(1), percentage)) + val percentile = ctx.name.getType match { + case SqlBaseParser.PERCENTILE_CONT => + sortOrder.direction match { + case Ascending => PercentileCont(sortOrder.child, percentage) + case Descending => PercentileCont(sortOrder.child, percentage, true) + } + case SqlBaseParser.PERCENTILE_DISC => + sortOrder.direction match { + case Ascending => PercentileDisc(sortOrder.child, percentage) + case Descending => PercentileDisc(sortOrder.child, percentage, true) + } } - val aggregateExpression = percentileCont.toAggregateExpression() + val aggregateExpression = percentile.toAggregateExpression() ctx.windowSpec match { case spec: WindowRefContext => UnresolvedWindowExpression(aggregateExpression, visitWindowRef(spec)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala index b5882b1ab40..e08c6279fd6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala @@ -83,8 +83,8 @@ class PercentileSuite extends SparkFunSuite { } private def runTest(agg: Percentile, - rows : Seq[Seq[Any]], - expectedPercentiles : Seq[Double]): Unit = { + rows : Seq[Seq[Any]], + expectedPercentiles : Seq[Double]): Unit = { assert(agg.nullable) val group1 = (0 until rows.length / 2) val group1Buffer = agg.createAggregationBuffer() @@ -218,7 +218,7 @@ class PercentileSuite extends SparkFunSuite { val percentile2 = new Percentile(child, percentage) assertEqual(percentile2.checkInputDataTypes(), TypeCheckFailure(s"Percentage(s) must be between 0.0 and 1.0, " + - s"but got ${percentage.simpleString(100)}")) + s"but got ${percentage.simpleString(100)}")) } val nonFoldablePercentage = Seq(NonFoldableLiteral(0.5), @@ -270,7 +270,6 @@ class PercentileSuite extends SparkFunSuite { } test("nulls in percentage expression") { - assert(new Percentile( AttributeReference("a", DoubleType)(), percentageExpression = Literal(null, DoubleType)).checkInputDataTypes() === @@ -280,9 +279,9 @@ class PercentileSuite extends SparkFunSuite { Seq(CreateArray(Seq(null).map(Literal(_))), CreateArray(Seq(0.1D, null).map(Literal(_)))) nullPercentageExprs.foreach { percentageExpression => - val wrongPercentage = new Percentile( - AttributeReference("a", DoubleType)(), - percentageExpression = percentageExpression) + val wrongPercentage = new Percentile( + AttributeReference("a", DoubleType)(), + percentageExpression = percentageExpression) assert( wrongPercentage.checkInputDataTypes() match { case TypeCheckFailure(msg) if msg.contains("argument 2 requires array<double>") => true diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 3d0d2fea7be..f511938f7aa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, RelationTimeTravel, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedStar, UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.PercentileCont +import org.apache.spark.sql.catalyst.expressions.aggregate.{PercentileCont, PercentileDisc} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.internal.SQLConf @@ -1299,24 +1299,36 @@ class PlanParserSuite extends AnalysisTest { "timestamp expression cannot contain subqueries") } - test("PERCENTILE_CONT function") { - def assertPercentileContPlans(inputSQL: String, expectedExpression: Expression): Unit = { + test("PERCENTILE_CONT & PERCENTILE_DISC") { + def assertPercentilePlans(inputSQL: String, expectedExpression: Expression): Unit = { comparePlans( parsePlan(inputSQL), Project(Seq(UnresolvedAlias(expectedExpression)), OneRowRelation()) ) } - assertPercentileContPlans( + assertPercentilePlans( "SELECT PERCENTILE_CONT(0.1) WITHIN GROUP (ORDER BY col)", PercentileCont(UnresolvedAttribute("col"), Literal(Decimal(0.1), DecimalType(1, 1))) .toAggregateExpression() ) - assertPercentileContPlans( + assertPercentilePlans( "SELECT PERCENTILE_CONT(0.1) WITHIN GROUP (ORDER BY col DESC)", PercentileCont(UnresolvedAttribute("col"), - Subtract(Literal(1), Literal(Decimal(0.1), DecimalType(1, 1)))).toAggregateExpression() + Literal(Decimal(0.1), DecimalType(1, 1)), true).toAggregateExpression() + ) + + assertPercentilePlans( + "SELECT PERCENTILE_DISC(0.1) WITHIN GROUP (ORDER BY col)", + PercentileDisc(UnresolvedAttribute("col"), Literal(Decimal(0.1), DecimalType(1, 1))) + .toAggregateExpression() + ) + + assertPercentilePlans( + "SELECT PERCENTILE_DISC(0.1) WITHIN GROUP (ORDER BY col DESC)", + new PercentileDisc(UnresolvedAttribute("col"), + Literal(Decimal(0.1), DecimalType(1, 1)), true).toAggregateExpression() ) } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql index 0fc9366a92a..dd10d7b5d1c 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql @@ -278,8 +278,8 @@ SELECT k, regr_sxx(y, x) FROM testRegression WHERE x IS NOT NULL AND y IS NOT NU -- SPARK-37676: Support ANSI Aggregation Function: percentile_cont SELECT - percentile_cont(0.25) WITHIN GROUP (ORDER BY v), - percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC) + percentile_cont(0.25) WITHIN GROUP (ORDER BY v), + percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC) FROM aggr; SELECT k, @@ -288,3 +288,16 @@ SELECT FROM aggr GROUP BY k ORDER BY k; + +-- SPARK-37691: Support ANSI Aggregation Function: percentile_disc +SELECT + percentile_disc(0.25) WITHIN GROUP (ORDER BY v), + percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC) +FROM aggr; +SELECT + k, + percentile_disc(0.25) WITHIN GROUP (ORDER BY v), + percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC) +FROM aggr +GROUP BY k +ORDER BY k; diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part4.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part4.sql index 0d255bed24e..3cf57b1f0e5 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part4.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part4.sql @@ -33,10 +33,10 @@ -- (values (0::float8),(0.1),(0.25),(0.4),(0.5),(0.6),(0.75),(0.9),(1)) v(p) -- group by p order by p; --- select percentile_cont(0.5) within group (order by b) from aggtest; --- select percentile_cont(0.5) within group (order by b), sum(b) from aggtest; --- select percentile_cont(0.5) within group (order by thousand) from tenk1; --- select percentile_disc(0.5) within group (order by thousand) from tenk1; +select percentile_cont(0.5) within group (order by b) from aggtest; +select percentile_cont(0.5) within group (order by b), sum(b) from aggtest; +select percentile_cont(0.5) within group (order by thousand) from tenk1; +select percentile_disc(0.5) within group (order by thousand) from tenk1; -- [SPARK-28661] Hypothetical-Set Aggregate Functions -- select rank(3) within group (order by x) -- from (values (1),(1),(2),(2),(3),(3),(4)) v(x); diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-aggregates_part4.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-aggregates_part4.sql index 8aea00073ee..dd14a7db0db 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-aggregates_part4.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/postgreSQL/udf-aggregates_part4.sql @@ -30,10 +30,10 @@ -- (values (0::float8),(0.1),(0.25),(0.4),(0.5),(0.6),(0.75),(0.9),(1)) v(p) -- group by p order by p; --- select percentile_cont(0.5) within group (order by b) from aggtest; --- select percentile_cont(0.5) within group (order by b), sum(b) from aggtest; --- select percentile_cont(0.5) within group (order by thousand) from tenk1; --- select percentile_disc(0.5) within group (order by thousand) from tenk1; +select percentile_cont(0.5) within group (order by b) from aggtest; +select percentile_cont(0.5) within group (order by b), sum(b) from aggtest; +select percentile_cont(0.5) within group (order by thousand) from tenk1; +select percentile_disc(0.5) within group (order by thousand) from tenk1; -- [SPARK-28661] Hypothetical-Set Aggregate Functions -- select rank(3) within group (order by x) -- from (values (1),(1),(2),(2),(3),(3),(4)) v(x); diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql index 29dc6980b65..e982683250c 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/window.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql @@ -446,7 +446,9 @@ SELECT department, salary, percentile_cont(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department), - percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department) + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department), + percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department), + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department) FROM basic_pays ORDER BY salary; @@ -459,6 +461,15 @@ SELECT FROM basic_pays ORDER BY salary; +SELECT + employee_name, + department, + salary, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department ORDER BY salary), + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department ORDER BY salary) +FROM basic_pays +ORDER BY salary; + SELECT employee_name, department, @@ -468,16 +479,40 @@ SELECT FROM basic_pays ORDER BY salary; +SELECT + employee_name, + department, + salary, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING), + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) +FROM basic_pays +ORDER BY salary; + SELECT employee_name, department, salary, percentile_cont(0.25) WITHIN GROUP (ORDER BY salary) OVER w, - percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w, + percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w FROM basic_pays WINDOW w AS (PARTITION BY department) ORDER BY salary; +SELECT + employee_name, + department, + salary, + percentile_cont(0.5) WITHIN GROUP (ORDER BY salary) OVER w, + percentile_disc(0.5) WITHIN GROUP (ORDER BY salary) OVER w, + percentile_cont(0.5) WITHIN GROUP (ORDER BY salary DESC) OVER w, + percentile_disc(0.5) WITHIN GROUP (ORDER BY salary DESC) OVER w +FROM basic_pays +WHERE salary > 8900 +WINDOW w AS (PARTITION BY department) +ORDER BY salary; + SELECT employee_name, department, @@ -488,6 +523,16 @@ FROM basic_pays WINDOW w AS (PARTITION BY department ORDER BY salary) ORDER BY salary; +SELECT + employee_name, + department, + salary, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w +FROM basic_pays +WINDOW w AS (PARTITION BY department ORDER BY salary) +ORDER BY salary; + SELECT employee_name, department, @@ -497,3 +542,13 @@ SELECT FROM basic_pays WINDOW w AS (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) ORDER BY salary; + +SELECT + employee_name, + department, + salary, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w +FROM basic_pays +WINDOW w AS (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) +ORDER BY salary; diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index 75bf4f1c2f7..bad431383ff 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 103 +-- Number of queries: 105 -- !query @@ -1007,11 +1007,11 @@ struct<k:int,regr_sxx(y, x):double> -- !query SELECT - percentile_cont(0.25) WITHIN GROUP (ORDER BY v), - percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC) + percentile_cont(0.25) WITHIN GROUP (ORDER BY v), + percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC) FROM aggr -- !query schema -struct<percentile_cont(v, 0.25):double,percentile_cont(v, (1 - 0.25)):double> +struct<percentile_cont(0.25) WITHIN GROUP (ORDER BY v):double,percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC):double> -- !query output 10.0 30.0 @@ -1025,10 +1025,39 @@ FROM aggr GROUP BY k ORDER BY k -- !query schema -struct<k:int,percentile_cont(v, 0.25):double,percentile_cont(v, (1 - 0.25)):double> +struct<k:int,percentile_cont(0.25) WITHIN GROUP (ORDER BY v):double,percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC):double> -- !query output 0 10.0 30.0 1 12.5 17.5 2 17.5 26.25 3 60.0 60.0 4 NULL NULL + + +-- !query +SELECT + percentile_disc(0.25) WITHIN GROUP (ORDER BY v), + percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC) +FROM aggr +-- !query schema +struct<percentile_disc(0.25) WITHIN GROUP (ORDER BY v):double,percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC):double> +-- !query output +10.0 30.0 + + +-- !query +SELECT + k, + percentile_disc(0.25) WITHIN GROUP (ORDER BY v), + percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC) +FROM aggr +GROUP BY k +ORDER BY k +-- !query schema +struct<k:int,percentile_disc(0.25) WITHIN GROUP (ORDER BY v):double,percentile_disc(0.25) WITHIN GROUP (ORDER BY v DESC):double> +-- !query output +0 10.0 30.0 +1 10.0 20.0 +2 10.0 30.0 +3 60.0 60.0 +4 NULL NULL diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part4.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part4.sql.out index b7bbdc50dfb..8c21a5067bf 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part4.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part4.sql.out @@ -1,5 +1,34 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 0 +-- Number of queries: 4 +-- !query +select percentile_cont(0.5) within group (order by b) from aggtest +-- !query schema +struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double> +-- !query output +53.44850015640259 + +-- !query +select percentile_cont(0.5) within group (order by b), sum(b) from aggtest +-- !query schema +struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double,sum(b):double> +-- !query output +53.44850015640259 431.77260909229517 + + +-- !query +select percentile_cont(0.5) within group (order by thousand) from tenk1 +-- !query schema +struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double> +-- !query output +499.5 + + +-- !query +select percentile_disc(0.5) within group (order by thousand) from tenk1 +-- !query schema +struct<percentile_disc(0.5) WITHIN GROUP (ORDER BY v):double> +-- !query output +499.0 diff --git a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part4.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part4.sql.out index b7bbdc50dfb..8c21a5067bf 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part4.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part4.sql.out @@ -1,5 +1,34 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 0 +-- Number of queries: 4 +-- !query +select percentile_cont(0.5) within group (order by b) from aggtest +-- !query schema +struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double> +-- !query output +53.44850015640259 + +-- !query +select percentile_cont(0.5) within group (order by b), sum(b) from aggtest +-- !query schema +struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double,sum(b):double> +-- !query output +53.44850015640259 431.77260909229517 + + +-- !query +select percentile_cont(0.5) within group (order by thousand) from tenk1 +-- !query schema +struct<percentile_cont(0.5) WITHIN GROUP (ORDER BY v):double> +-- !query output +499.5 + + +-- !query +select percentile_disc(0.5) within group (order by thousand) from tenk1 +-- !query schema +struct<percentile_disc(0.5) WITHIN GROUP (ORDER BY v):double> +-- !query output +499.0 diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out index bac00d85c67..7e0fd2772d8 100644 --- a/sql/core/src/test/resources/sql-tests/results/window.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 61 +-- Number of queries: 66 -- !query @@ -1205,29 +1205,31 @@ SELECT department, salary, percentile_cont(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department), - percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department) + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department), + percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department), + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department) FROM basic_pays ORDER BY salary -- !query schema -struct<employee_name:string,department:string,salary:int,percentile_cont(salary, 0.25) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_cont(salary, (1 - 0.25)) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double> +struct<employee_name:string,department:string,salary:int,percentile_cont(0.25) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_disc(0.25) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,p [...] -- !query output -Leslie Thompson IT 5186 5917.75 7381.25 -Anthony Bow Accounting 6627 8543.75 9746.5 -Foon Yue Tseng Sales 6660 8550.75 9721.5 -Gerard Hernandez SCM 6949 10449.0 11303.0 -Leslie Jennings IT 8113 5917.75 7381.25 -Diane Murphy Accounting 8435 8543.75 9746.5 -William Patterson Accounting 8870 8543.75 9746.5 -Jeff Firrelli Accounting 8992 8543.75 9746.5 -Julie Firrelli Sales 9181 8550.75 9721.5 -Steve Patterson Sales 9441 8550.75 9721.5 -Mary Patterson Accounting 9998 8543.75 9746.5 -Loui Bondur SCM 10449 10449.0 11303.0 -George Vanauf Sales 10563 8550.75 9721.5 -Barry Jones SCM 10586 10449.0 11303.0 -Pamela Castillo SCM 11303 10449.0 11303.0 -Gerard Bondur Accounting 11472 8543.75 9746.5 -Larry Bott SCM 11798 10449.0 11303.0 +Leslie Thompson IT 5186 5917.75 5186.0 7381.25 8113.0 +Anthony Bow Accounting 6627 8543.75 8435.0 9746.5 9998.0 +Foon Yue Tseng Sales 6660 8550.75 6660.0 9721.5 10563.0 +Gerard Hernandez SCM 6949 10449.0 10449.0 11303.0 11303.0 +Leslie Jennings IT 8113 5917.75 5186.0 7381.25 8113.0 +Diane Murphy Accounting 8435 8543.75 8435.0 9746.5 9998.0 +William Patterson Accounting 8870 8543.75 8435.0 9746.5 9998.0 +Jeff Firrelli Accounting 8992 8543.75 8435.0 9746.5 9998.0 +Julie Firrelli Sales 9181 8550.75 6660.0 9721.5 10563.0 +Steve Patterson Sales 9441 8550.75 6660.0 9721.5 10563.0 +Mary Patterson Accounting 9998 8543.75 8435.0 9746.5 9998.0 +Loui Bondur SCM 10449 10449.0 10449.0 11303.0 11303.0 +George Vanauf Sales 10563 8550.75 6660.0 9721.5 10563.0 +Barry Jones SCM 10586 10449.0 10449.0 11303.0 11303.0 +Pamela Castillo SCM 11303 10449.0 10449.0 11303.0 11303.0 +Gerard Bondur Accounting 11472 8543.75 8435.0 9746.5 9998.0 +Larry Bott SCM 11798 10449.0 10449.0 11303.0 11303.0 -- !query @@ -1243,7 +1245,23 @@ ORDER BY salary struct<> -- !query output org.apache.spark.sql.AnalysisException -Cannot specify order by or frame for 'PERCENTILE_CONT'. +Cannot specify order by or frame for 'percentile_cont'. + + +-- !query +SELECT + employee_name, + department, + salary, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department ORDER BY salary), + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department ORDER BY salary) +FROM basic_pays +ORDER BY salary +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Cannot specify order by or frame for 'percentile_disc'. -- !query @@ -1259,7 +1277,23 @@ ORDER BY salary struct<> -- !query output org.apache.spark.sql.AnalysisException -Cannot specify order by or frame for 'PERCENTILE_CONT'. +Cannot specify order by or frame for 'percentile_cont'. + + +-- !query +SELECT + employee_name, + department, + salary, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING), + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) +FROM basic_pays +ORDER BY salary +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Cannot specify order by or frame for 'percentile_disc'. -- !query @@ -1268,30 +1302,60 @@ SELECT department, salary, percentile_cont(0.25) WITHIN GROUP (ORDER BY salary) OVER w, - percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w, + percentile_cont(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w FROM basic_pays WINDOW w AS (PARTITION BY department) ORDER BY salary -- !query schema -struct<employee_name:string,department:string,salary:int,percentile_cont(salary, 0.25) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_cont(salary, (1 - 0.25)) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double> +struct<employee_name:string,department:string,salary:int,percentile_cont(0.25) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_disc(0.25) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_cont(0.25) WITHIN GROUP (ORDER BY v DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,p [...] -- !query output -Leslie Thompson IT 5186 5917.75 7381.25 -Anthony Bow Accounting 6627 8543.75 9746.5 -Foon Yue Tseng Sales 6660 8550.75 9721.5 -Gerard Hernandez SCM 6949 10449.0 11303.0 -Leslie Jennings IT 8113 5917.75 7381.25 -Diane Murphy Accounting 8435 8543.75 9746.5 -William Patterson Accounting 8870 8543.75 9746.5 -Jeff Firrelli Accounting 8992 8543.75 9746.5 -Julie Firrelli Sales 9181 8550.75 9721.5 -Steve Patterson Sales 9441 8550.75 9721.5 -Mary Patterson Accounting 9998 8543.75 9746.5 -Loui Bondur SCM 10449 10449.0 11303.0 -George Vanauf Sales 10563 8550.75 9721.5 -Barry Jones SCM 10586 10449.0 11303.0 -Pamela Castillo SCM 11303 10449.0 11303.0 -Gerard Bondur Accounting 11472 8543.75 9746.5 -Larry Bott SCM 11798 10449.0 11303.0 +Leslie Thompson IT 5186 5917.75 5186.0 7381.25 8113.0 +Anthony Bow Accounting 6627 8543.75 8435.0 9746.5 9998.0 +Foon Yue Tseng Sales 6660 8550.75 6660.0 9721.5 10563.0 +Gerard Hernandez SCM 6949 10449.0 10449.0 11303.0 11303.0 +Leslie Jennings IT 8113 5917.75 5186.0 7381.25 8113.0 +Diane Murphy Accounting 8435 8543.75 8435.0 9746.5 9998.0 +William Patterson Accounting 8870 8543.75 8435.0 9746.5 9998.0 +Jeff Firrelli Accounting 8992 8543.75 8435.0 9746.5 9998.0 +Julie Firrelli Sales 9181 8550.75 6660.0 9721.5 10563.0 +Steve Patterson Sales 9441 8550.75 6660.0 9721.5 10563.0 +Mary Patterson Accounting 9998 8543.75 8435.0 9746.5 9998.0 +Loui Bondur SCM 10449 10449.0 10449.0 11303.0 11303.0 +George Vanauf Sales 10563 8550.75 6660.0 9721.5 10563.0 +Barry Jones SCM 10586 10449.0 10449.0 11303.0 11303.0 +Pamela Castillo SCM 11303 10449.0 10449.0 11303.0 11303.0 +Gerard Bondur Accounting 11472 8543.75 8435.0 9746.5 9998.0 +Larry Bott SCM 11798 10449.0 10449.0 11303.0 11303.0 + + +-- !query +SELECT + employee_name, + department, + salary, + percentile_cont(0.5) WITHIN GROUP (ORDER BY salary) OVER w, + percentile_disc(0.5) WITHIN GROUP (ORDER BY salary) OVER w, + percentile_cont(0.5) WITHIN GROUP (ORDER BY salary DESC) OVER w, + percentile_disc(0.5) WITHIN GROUP (ORDER BY salary DESC) OVER w +FROM basic_pays +WHERE salary > 8900 +WINDOW w AS (PARTITION BY department) +ORDER BY salary +-- !query schema +struct<employee_name:string,department:string,salary:int,percentile_cont(0.5) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_disc(0.5) WITHIN GROUP (ORDER BY v) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,percentile_cont(0.5) WITHIN GROUP (ORDER BY v DESC) OVER (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double,perc [...] +-- !query output +Jeff Firrelli Accounting 8992 9998.0 9998.0 9998.0 9998.0 +Julie Firrelli Sales 9181 9441.0 9441.0 9441.0 9441.0 +Steve Patterson Sales 9441 9441.0 9441.0 9441.0 9441.0 +Mary Patterson Accounting 9998 9998.0 9998.0 9998.0 9998.0 +Loui Bondur SCM 10449 10944.5 10586.0 10944.5 11303.0 +George Vanauf Sales 10563 9441.0 9441.0 9441.0 9441.0 +Barry Jones SCM 10586 10944.5 10586.0 10944.5 11303.0 +Pamela Castillo SCM 11303 10944.5 10586.0 10944.5 11303.0 +Gerard Bondur Accounting 11472 9998.0 9998.0 9998.0 9998.0 +Larry Bott SCM 11798 10944.5 10586.0 10944.5 11303.0 -- !query @@ -1308,7 +1372,24 @@ ORDER BY salary struct<> -- !query output org.apache.spark.sql.AnalysisException -Cannot specify order by or frame for 'PERCENTILE_CONT'. +Cannot specify order by or frame for 'percentile_cont'. + + +-- !query +SELECT + employee_name, + department, + salary, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w +FROM basic_pays +WINDOW w AS (PARTITION BY department ORDER BY salary) +ORDER BY salary +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Cannot specify order by or frame for 'percentile_disc'. -- !query @@ -1325,4 +1406,21 @@ ORDER BY salary struct<> -- !query output org.apache.spark.sql.AnalysisException -Cannot specify order by or frame for 'PERCENTILE_CONT'. +Cannot specify order by or frame for 'percentile_cont'. + + +-- !query +SELECT + employee_name, + department, + salary, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary) OVER w, + percentile_disc(0.25) WITHIN GROUP (ORDER BY salary DESC) OVER w +FROM basic_pays +WINDOW w AS (PARTITION BY department ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) +ORDER BY salary +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Cannot specify order by or frame for 'percentile_disc'. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org