This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 054522b6762 [SPARK-42520][CONNECT] Support basic Window API in Scala client 054522b6762 is described below commit 054522b67626aa1515b8f3f164ba7c063c38e5b8 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Wed Feb 22 15:19:00 2023 +0800 [SPARK-42520][CONNECT] Support basic Window API in Scala client ### What changes were proposed in this pull request? Support Window orderby, partitionby, rowsbetween/rangebetween. ### Why are the changes needed? API coverage ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #40107 from amaliujia/rw-window-2. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../main/scala/org/apache/spark/sql/Column.scala | 33 +++ .../org/apache/spark/sql/expressions/Window.scala | 241 +++++++++++++++++++++ .../apache/spark/sql/expressions/WindowSpec.scala | 240 ++++++++++++++++++++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 11 + .../query-tests/explain-results/window.explain | 8 + .../test/resources/query-tests/queries/window.json | 205 ++++++++++++++++++ .../resources/query-tests/queries/window.proto.bin | 43 ++++ 7 files changed, 781 insertions(+) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala index c3e1113aa45..fde17963bfd 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala @@ -24,6 +24,7 @@ import org.apache.spark.connect.proto.Expression.SortOrder.SortDirection import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.connect.common.DataTypeProtoConverter +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types.{DataType, Metadata} @@ -1233,6 +1234,38 @@ class Column private[sql] (private[sql] val expr: proto.Expression) extends Logg * @since 3.4.0 */ def bitwiseXOR(other: Any): Column = fn("^", other) + + /** + * Defines a windowing column. + * + * {{{ + * val w = Window.partitionBy("name").orderBy("id") + * df.select( + * sum("price").over(w.rangeBetween(Window.unboundedPreceding, 2)), + * avg("price").over(w.rowsBetween(Window.currentRow, 4)) + * ) + * }}} + * + * @group expr_ops + * @since 3.4.0 + */ + def over(window: expressions.WindowSpec): Column = window.withAggregate(this) + + /** + * Defines an empty analytic clause. In this case the analytic function is applied and presented + * for all rows in the result set. + * + * {{{ + * df.select( + * sum("price").over(), + * avg("price").over() + * ) + * }}} + * + * @group expr_ops + * @since 3.4.0 + */ + def over(): Column = over(Window.spec) } private[sql] object Column { diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Window.scala new file mode 100644 index 00000000000..c85e7bc9c5c --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.expressions + +import org.apache.spark.annotation.Stable +import org.apache.spark.sql.Column + +/** + * Utility functions for defining window in DataFrames. + * + * {{{ + * // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + * Window.partitionBy("country").orderBy("date") + * .rowsBetween(Window.unboundedPreceding, Window.currentRow) + * + * // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING + * Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3) + * }}} + * + * @note + * When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, + * unboundedFollowing) is used by default. When ordering is defined, a growing window frame + * (rangeFrame, unboundedPreceding, currentRow) is used by default. + * + * @since 3.4.0 + */ +@Stable +object Window { + + /** + * Creates a [[WindowSpec]] with the partitioning defined. + * @since 3.4.0 + */ + @scala.annotation.varargs + def partitionBy(colName: String, colNames: String*): WindowSpec = { + spec.partitionBy(colName, colNames: _*) + } + + /** + * Creates a [[WindowSpec]] with the partitioning defined. + * @since 3.4.0 + */ + @scala.annotation.varargs + def partitionBy(cols: Column*): WindowSpec = { + spec.partitionBy(cols: _*) + } + + /** + * Creates a [[WindowSpec]] with the ordering defined. + * @since 3.4.0 + */ + @scala.annotation.varargs + def orderBy(colName: String, colNames: String*): WindowSpec = { + spec.orderBy(colName, colNames: _*) + } + + /** + * Creates a [[WindowSpec]] with the ordering defined. + * @since 1.4.0 + */ + @scala.annotation.varargs + def orderBy(cols: Column*): WindowSpec = { + spec.orderBy(cols: _*) + } + + /** + * Value representing the first row in the partition, equivalent to "UNBOUNDED PRECEDING" in + * SQL. This can be used to specify the frame boundaries: + * + * {{{ + * Window.rowsBetween(Window.unboundedPreceding, Window.currentRow) + * }}} + * + * @since 3.4.0 + */ + def unboundedPreceding: Long = Long.MinValue + + /** + * Value representing the last row in the partition, equivalent to "UNBOUNDED FOLLOWING" in SQL. + * This can be used to specify the frame boundaries: + * + * {{{ + * Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + * }}} + * + * @since 3.4.0 + */ + def unboundedFollowing: Long = Long.MaxValue + + /** + * Value representing the current row. This can be used to specify the frame boundaries: + * + * {{{ + * Window.rowsBetween(Window.unboundedPreceding, Window.currentRow) + * }}} + * + * @since 3.4.0 + */ + def currentRow: Long = 0 + + /** + * Creates a [[WindowSpec]] with the frame boundaries defined, from `start` (inclusive) to `end` + * (inclusive). + * + * Both `start` and `end` are positions relative to the current row. For example, "0" means + * "current row", while "-1" means the row before the current row, and "5" means the fifth row + * after the current row. + * + * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and + * `Window.currentRow` to specify special boundary values, rather than using integral values + * directly. + * + * A row based boundary is based on the position of the row within the partition. An offset + * indicates the number of rows above or below the current row, the frame for the current row + * starts or ends. For instance, given a row based sliding frame with a lower bound offset of -1 + * and a upper bound offset of +2. The frame for row with index 5 would range from index 4 to + * index 7. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * val byCategoryOrderedById = + * Window.partitionBy($"category").orderBy($"id").rowsBetween(Window.currentRow, 1) + * df.withColumn("sum", sum($"id") over byCategoryOrderedById).show() + * + * +---+--------+---+ + * | id|category|sum| + * +---+--------+---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 2| + * | 1| a| 3| + * | 2| a| 2| + * +---+--------+---+ + * }}} + * + * @param start + * boundary start, inclusive. The frame is unbounded if this is the minimum long value + * (`Window.unboundedPreceding`). + * @param end + * boundary end, inclusive. The frame is unbounded if this is the maximum long value + * (`Window.unboundedFollowing`). + * @since 3.4.0 + */ + // Note: when updating the doc for this method, also update WindowSpec.rowsBetween. + def rowsBetween(start: Long, end: Long): WindowSpec = { + spec.rowsBetween(start, end) + } + + /** + * Creates a [[WindowSpec]] with the frame boundaries defined, from `start` (inclusive) to `end` + * (inclusive). + * + * Both `start` and `end` are relative to the current row. For example, "0" means "current row", + * while "-1" means one off before the current row, and "5" means the five off after the current + * row. + * + * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and + * `Window.currentRow` to specify special boundary values, rather than using long values + * directly. + * + * A range-based boundary is based on the actual value of the ORDER BY expression(s). An offset + * is used to alter the value of the ORDER BY expression, for instance if the current ORDER BY + * expression has a value of 10 and the lower bound offset is -3, the resulting lower bound for + * the current row will be 10 - 3 = 7. This however puts a number of constraints on the ORDER BY + * expressions: there can be only one expression and this expression must have a numerical data + * type. An exception can be made when the offset is unbounded, because no value modification is + * needed, in this case multiple and non-numeric ORDER BY expression are allowed. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * val byCategoryOrderedById = + * Window.partitionBy($"category").orderBy($"id").rangeBetween(Window.currentRow, 1) + * df.withColumn("sum", sum($"id") over byCategoryOrderedById).show() + * + * +---+--------+---+ + * | id|category|sum| + * +---+--------+---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 4| + * | 1| a| 4| + * | 2| a| 2| + * +---+--------+---+ + * }}} + * + * @param start + * boundary start, inclusive. The frame is unbounded if this is the minimum long value + * (`Window.unboundedPreceding`). + * @param end + * boundary end, inclusive. The frame is unbounded if this is the maximum long value + * (`Window.unboundedFollowing`). + * @since 3.4.0 + */ + // Note: when updating the doc for this method, also update WindowSpec.rangeBetween. + def rangeBetween(start: Long, end: Long): WindowSpec = { + spec.rangeBetween(start, end) + } + + private[sql] def spec: WindowSpec = { + new WindowSpec(Seq.empty, Seq.empty, None) + } + +} + +/** + * Utility functions for defining window in DataFrames. + * + * {{{ + * // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + * Window.partitionBy("country").orderBy("date") + * .rowsBetween(Window.unboundedPreceding, Window.currentRow) + * + * // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING + * Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3) + * }}} + * + * @since 3.4.0 + */ +@Stable +class Window private () // So we can see Window in JavaDoc. diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala new file mode 100644 index 00000000000..cecfb6a0d91 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.expressions + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Stable +import org.apache.spark.connect.proto +import org.apache.spark.sql.Column + +/** + * A window specification that defines the partitioning, ordering, and frame boundaries. + * + * Use the static methods in [[Window]] to create a [[WindowSpec]]. + * + * @since 3.4.0 + */ +@Stable +class WindowSpec private[sql] ( + partitionSpec: Seq[proto.Expression], + orderSpec: Seq[proto.Expression.SortOrder], + frame: Option[proto.Expression.Window.WindowFrame]) { + + /** + * Defines the partitioning columns in a [[WindowSpec]]. + * @since 3.4.0 + */ + @scala.annotation.varargs + def partitionBy(colName: String, colNames: String*): WindowSpec = { + partitionBy((colName +: colNames).map(Column(_)): _*) + } + + /** + * Defines the partitioning columns in a [[WindowSpec]]. + * @since 3.4.0 + */ + @scala.annotation.varargs + def partitionBy(cols: Column*): WindowSpec = { + new WindowSpec(cols.map(_.expr), orderSpec, frame) + } + + /** + * Defines the ordering columns in a [[WindowSpec]]. + * @since 3.4.0 + */ + @scala.annotation.varargs + def orderBy(colName: String, colNames: String*): WindowSpec = { + orderBy((colName +: colNames).map(Column(_)): _*) + } + + /** + * Defines the ordering columns in a [[WindowSpec]]. + * @since 3.4.0 + */ + @scala.annotation.varargs + def orderBy(cols: Column*): WindowSpec = { + val sortOrder: Seq[proto.Expression.SortOrder] = cols.map(_.sortOrder) + new WindowSpec(partitionSpec, sortOrder, frame) + } + + /** + * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). + * + * Both `start` and `end` are relative positions from the current row. For example, "0" means + * "current row", while "-1" means the row before the current row, and "5" means the fifth row + * after the current row. + * + * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and + * `Window.currentRow` to specify special boundary values, rather than using integral values + * directly. + * + * A row based boundary is based on the position of the row within the partition. An offset + * indicates the number of rows above or below the current row, the frame for the current row + * starts or ends. For instance, given a row based sliding frame with a lower bound offset of -1 + * and a upper bound offset of +2. The frame for row with index 5 would range from index 4 to + * index 7. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * val byCategoryOrderedById = + * Window.partitionBy($"category").orderBy($"id").rowsBetween(Window.currentRow, 1) + * df.withColumn("sum", sum($"id") over byCategoryOrderedById).show() + * + * +---+--------+---+ + * | id|category|sum| + * +---+--------+---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 2| + * | 1| a| 3| + * | 2| a| 2| + * +---+--------+---+ + * }}} + * + * @param start + * boundary start, inclusive. The frame is unbounded if this is the minimum long value + * (`Window.unboundedPreceding`). + * @param end + * boundary end, inclusive. The frame is unbounded if this is the maximum long value + * (`Window.unboundedFollowing`). + * @since 3.4.0 + */ + // Note: when updating the doc for this method, also update Window.rowsBetween. + def rowsBetween(start: Long, end: Long): WindowSpec = { + new WindowSpec( + partitionSpec, + orderSpec, + Some( + toWindowFrame( + proto.Expression.Window.WindowFrame.FrameType.FRAME_TYPE_ROW, + start, + end, + true))) + } + + /** + * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). + * + * Both `start` and `end` are relative from the current row. For example, "0" means "current + * row", while "-1" means one off before the current row, and "5" means the five off after the + * current row. + * + * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and + * `Window.currentRow` to specify special boundary values, rather than using long values + * directly. + * + * A range-based boundary is based on the actual value of the ORDER BY expression(s). An offset + * is used to alter the value of the ORDER BY expression, for instance if the current order by + * expression has a value of 10 and the lower bound offset is -3, the resulting lower bound for + * the current row will be 10 - 3 = 7. This however puts a number of constraints on the ORDER BY + * expressions: there can be only one expression and this expression must have a numerical data + * type. An exception can be made when the offset is unbounded, because no value modification is + * needed, in this case multiple and non-numeric ORDER BY expression are allowed. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * val byCategoryOrderedById = + * Window.partitionBy($"category").orderBy($"id").rangeBetween(Window.currentRow, 1) + * df.withColumn("sum", sum($"id") over byCategoryOrderedById).show() + * + * +---+--------+---+ + * | id|category|sum| + * +---+--------+---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 4| + * | 1| a| 4| + * | 2| a| 2| + * +---+--------+---+ + * }}} + * + * @param start + * boundary start, inclusive. The frame is unbounded if this is the minimum long value + * (`Window.unboundedPreceding`). + * @param end + * boundary end, inclusive. The frame is unbounded if this is the maximum long value + * (`Window.unboundedFollowing`). + * @since 3.4.0 + */ + // Note: when updating the doc for this method, also update Window.rangeBetween. + def rangeBetween(start: Long, end: Long): WindowSpec = { + new WindowSpec( + partitionSpec, + orderSpec, + Some( + toWindowFrame( + proto.Expression.Window.WindowFrame.FrameType.FRAME_TYPE_RANGE, + start, + end, + false))) + } + + /** + * Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression. + */ + private[sql] def withAggregate(aggregate: Column): Column = { + Column { builder => + val windowBuilder = builder.getWindowBuilder + windowBuilder.setWindowFunction(aggregate.expr) + if (frame.isDefined) { + windowBuilder.setFrameSpec(frame.get) + } + windowBuilder.addAllPartitionSpec(partitionSpec.asJava) + windowBuilder.addAllOrderSpec(orderSpec.asJava) + } + } + + private[sql] def toWindowFrame( + frameType: proto.Expression.Window.WindowFrame.FrameType, + start: Long, + end: Long, + isRowBetween: Boolean): proto.Expression.Window.WindowFrame = { + val windowFrameBuilder = proto.Expression.Window.WindowFrame.newBuilder() + windowFrameBuilder.setFrameType(frameType) + start match { + case 0 => windowFrameBuilder.getLowerBuilder.setCurrentRow(true) + case Long.MinValue => windowFrameBuilder.getLowerBuilder.setUnbounded(true) + case x if isRowBetween && Int.MinValue <= x && x <= Int.MaxValue => + windowFrameBuilder.getLowerBuilder.getValueBuilder.getLiteralBuilder + .setInteger(start.toInt) + case _ if !isRowBetween => + windowFrameBuilder.getLowerBuilder.getValueBuilder.getLiteralBuilder.setLong(start) + case _ => throw new UnsupportedOperationException() + } + + end match { + case 0 => windowFrameBuilder.getUpperBuilder.setCurrentRow(true) + case Long.MaxValue => windowFrameBuilder.getUpperBuilder.setUnbounded(true) + case x if isRowBetween && Int.MinValue <= x && x <= Int.MaxValue => + windowFrameBuilder.getUpperBuilder.getValueBuilder.getLiteralBuilder + .setInteger(end.toInt) + case _ if !isRowBetween => + windowFrameBuilder.getUpperBuilder.getValueBuilder.getLiteralBuilder.setLong(end) + case _ => throw new UnsupportedOperationException() + } + + windowFrameBuilder.build() + } +} diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index b9ae66b2b9e..4cd7bfa0887 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.connect.proto import org.apache.spark.internal.Logging import org.apache.spark.sql.{functions => fn} import org.apache.spark.sql.connect.client.SparkConnectClient +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types._ @@ -1645,4 +1646,14 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit } /* Window API */ + test("window") { + simple.select( + fn.min("id").over(Window.partitionBy(Column("a"), Column("b"))), + fn.min("id").over(Window.partitionBy("a", "b")), + fn.min("id").over(Window.orderBy(Column("a"), Column("b"))), + fn.min("id").over(Window.orderBy("a", "b")), + fn.min("id").over(Window.orderBy("a").rowsBetween(2L, 3L)), + fn.min("id").over(Window.orderBy("a").rangeBetween(2L, 3L)), + fn.count(Column("id")).over()) + } } diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/window.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/window.explain new file mode 100644 index 00000000000..6f816957925 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/window.explain @@ -0,0 +1,8 @@ +Project [min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING [...] ++- Project [id#0L, a#0, b#0, min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST ROWS [...] + +- Window [count(id#0L) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L] + +- Window [min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 2, 3)) AS min(id) OVER (ORDER BY a ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 3 FOLLOWING)#0L, min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, cast(2 as int), cast(3 as int))) AS min(id) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN 2 FOLLOWING AND 3 FOLLOWING)#0L], [a#0 ASC NULLS FIRST] + +- Window [min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, b#0 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, b#0 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST [...] + +- Window [min(id#0L) windowspecdefinition(a#0, b#0, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id#0L) windowspecdefinition(a#0, b#0, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L], [a#0, b#0] + +- Project [id#0L, a#0, b#0] + +- LocalRelation <empty>, [id#0L, a#0, b#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/window.json b/connector/connect/common/src/test/resources/query-tests/queries/window.json new file mode 100644 index 00000000000..8649a9d6e54 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/window.json @@ -0,0 +1,205 @@ +{ + "project": { + "input": { + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double\u003e" + } + }, + "expressions": [{ + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "min", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }] + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "b" + } + }] + } + }, { + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "min", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }] + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "b" + } + }] + } + }, { + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "min", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }] + } + }, + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }, { + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "b" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }, { + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "min", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }] + } + }, + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }, { + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "b" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }, { + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "min", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }] + } + }, + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }], + "frameSpec": { + "frameType": "FRAME_TYPE_ROW", + "lower": { + "value": { + "literal": { + "integer": 2 + } + } + }, + "upper": { + "value": { + "literal": { + "integer": 3 + } + } + } + } + } + }, { + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "min", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }] + } + }, + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }], + "frameSpec": { + "frameType": "FRAME_TYPE_RANGE", + "lower": { + "value": { + "literal": { + "long": "2" + } + } + }, + "upper": { + "value": { + "literal": { + "long": "3" + } + } + } + } + } + }, { + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "count", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }] + } + } + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/window.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/window.proto.bin new file mode 100644 index 00000000000..caa9d66934c --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/window.proto.bin @@ -0,0 +1,43 @@ +� +$Z" struct<id:bigint,a:int,b:double>!Z + +min +id +a +b!Z + +min +id +a +b-Z+ + +min +id + +a + +b-Z+ + +min +id + +a + +b4Z2 + +min +id + +a" +0 +04Z2 + +min +id + +a" +8 +8Z + +count +id \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org