Repository: spark Updated Branches: refs/heads/branch-2.4 eff1c5016 -> f37bceadf
[SPARK-25842][SQL] Deprecate rangeBetween APIs introduced in SPARK-21608 ## What changes were proposed in this pull request? See the detailed information at https://issues.apache.org/jira/browse/SPARK-25841 on why these APIs should be deprecated and redesigned. This patch also reverts https://github.com/apache/spark/commit/8acb51f08b448628b65e90af3b268994f9550e45 which applies to 2.4. ## How was this patch tested? Only deprecation and doc changes. Closes #22841 from rxin/SPARK-25842. Authored-by: Reynold Xin <r...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 89d748b33c8636a1b1411c505921b0a585e1e6cb) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f37bcead Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f37bcead Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f37bcead Branch: refs/heads/branch-2.4 Commit: f37bceadf2135348c006c3d37ab7d6101cfe2267 Parents: eff1c50 Author: Reynold Xin <r...@databricks.com> Authored: Fri Oct 26 13:17:24 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri Oct 26 13:17:50 2018 +0800 ---------------------------------------------------------------------- python/pyspark/sql/functions.py | 30 --------- python/pyspark/sql/window.py | 70 +++++--------------- .../apache/spark/sql/expressions/Window.scala | 46 +------------ .../spark/sql/expressions/WindowSpec.scala | 45 +------------ .../scala/org/apache/spark/sql/functions.scala | 12 ++-- 5 files changed, 28 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f37bcead/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 785e55e..9485c28 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -855,36 +855,6 @@ def ntile(n): return Column(sc._jvm.functions.ntile(int(n))) -@since(2.4) -def unboundedPreceding(): - """ - Window function: returns the special frame boundary that represents the first row - in the window partition. - """ - sc = SparkContext._active_spark_context - return Column(sc._jvm.functions.unboundedPreceding()) - - -@since(2.4) -def unboundedFollowing(): - """ - Window function: returns the special frame boundary that represents the last row - in the window partition. - """ - sc = SparkContext._active_spark_context - return Column(sc._jvm.functions.unboundedFollowing()) - - -@since(2.4) -def currentRow(): - """ - Window function: returns the special frame boundary that represents the current row - in the window partition. - """ - sc = SparkContext._active_spark_context - return Column(sc._jvm.functions.currentRow()) - - # ---------------------- Date/Timestamp functions ------------------------------ @since(1.5) http://git-wip-us.apache.org/repos/asf/spark/blob/f37bcead/python/pyspark/sql/window.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index d19ced9..e76563d 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -16,11 +16,9 @@ # import sys -if sys.version >= '3': - long = int from pyspark import since, SparkContext -from pyspark.sql.column import Column, _to_seq, _to_java_column +from pyspark.sql.column import _to_seq, _to_java_column __all__ = ["Window", "WindowSpec"] @@ -126,45 +124,20 @@ class Window(object): and "5" means the five off after the current row. We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``, - ``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``, - ``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow`` - to specify special boundary values, rather than using integral values directly. + and ``Window.currentRow`` to specify special boundary values, rather than using integral + values directly. :param start: boundary start, inclusive. - The frame is unbounded if this is ``Window.unboundedPreceding``, - a column returned by ``pyspark.sql.functions.unboundedPreceding``, or + The frame is unbounded if this is ``Window.unboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. - The frame is unbounded if this is ``Window.unboundedFollowing``, - a column returned by ``pyspark.sql.functions.unboundedFollowing``, or + The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). - - >>> from pyspark.sql import functions as F, SparkSession, Window - >>> spark = SparkSession.builder.getOrCreate() - >>> df = spark.createDataFrame( - ... [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]) - >>> window = Window.orderBy("id").partitionBy("category").rangeBetween( - ... F.currentRow(), F.lit(1)) - >>> df.withColumn("sum", F.sum("id").over(window)).show() - +---+--------+---+ - | id|category|sum| - +---+--------+---+ - | 1| b| 3| - | 2| b| 5| - | 3| b| 3| - | 1| a| 4| - | 1| a| 4| - | 2| a| 2| - +---+--------+---+ """ - if isinstance(start, (int, long)) and isinstance(end, (int, long)): - if start <= Window._PRECEDING_THRESHOLD: - start = Window.unboundedPreceding - if end >= Window._FOLLOWING_THRESHOLD: - end = Window.unboundedFollowing - elif isinstance(start, Column) and isinstance(end, Column): - start = start._jc - end = end._jc + if start <= Window._PRECEDING_THRESHOLD: + start = Window.unboundedPreceding + if end >= Window._FOLLOWING_THRESHOLD: + end = Window.unboundedFollowing sc = SparkContext._active_spark_context jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end) return WindowSpec(jspec) @@ -239,34 +212,27 @@ class WindowSpec(object): and "5" means the five off after the current row. We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``, - ``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``, - ``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow`` - to specify special boundary values, rather than using integral values directly. + and ``Window.currentRow`` to specify special boundary values, rather than using integral + values directly. :param start: boundary start, inclusive. - The frame is unbounded if this is ``Window.unboundedPreceding``, - a column returned by ``pyspark.sql.functions.unboundedPreceding``, or + The frame is unbounded if this is ``Window.unboundedPreceding``, or any value less than or equal to max(-sys.maxsize, -9223372036854775808). :param end: boundary end, inclusive. - The frame is unbounded if this is ``Window.unboundedFollowing``, - a column returned by ``pyspark.sql.functions.unboundedFollowing``, or + The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ - if isinstance(start, (int, long)) and isinstance(end, (int, long)): - if start <= Window._PRECEDING_THRESHOLD: - start = Window.unboundedPreceding - if end >= Window._FOLLOWING_THRESHOLD: - end = Window.unboundedFollowing - elif isinstance(start, Column) and isinstance(end, Column): - start = start._jc - end = end._jc + if start <= Window._PRECEDING_THRESHOLD: + start = Window.unboundedPreceding + if end >= Window._FOLLOWING_THRESHOLD: + end = Window.unboundedFollowing return WindowSpec(self._jspec.rangeBetween(start, end)) def _test(): import doctest SparkContext('local[4]', 'PythonTest') - (failure_count, test_count) = doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE) + (failure_count, test_count) = doctest.testmod() if failure_count: sys.exit(-1) http://git-wip-us.apache.org/repos/asf/spark/blob/f37bcead/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index cd819ba..14dec8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -215,52 +215,10 @@ object Window { } /** - * Creates a [[WindowSpec]] with the frame boundaries defined, - * from `start` (inclusive) to `end` (inclusive). - * - * Both `start` and `end` are relative to the current row. For example, "lit(0)" means - * "current row", while "lit(-1)" means one off before the current row, and "lit(5)" means the - * five off after the current row. - * - * Users should use `unboundedPreceding()`, `unboundedFollowing()`, and `currentRow()` from - * [[org.apache.spark.sql.functions]] to specify special boundary values, literals are not - * transformed to [[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s. - * - * A range-based boundary is based on the actual value of the ORDER BY - * expression(s). An offset is used to alter the value of the ORDER BY expression, for - * instance if the current order by expression has a value of 10 and the lower bound offset - * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a - * number of constraints on the ORDER BY expressions: there can be only one expression and this - * expression must have a numerical/date/timestamp data type. An exception can be made when the - * offset is unbounded, because no value modification is needed, in this case multiple and - * non-numerical/date/timestamp data type ORDER BY expression are allowed. - * - * {{{ - * import org.apache.spark.sql.expressions.Window - * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) - * .toDF("id", "category") - * val byCategoryOrderedById = - * Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), lit(1)) - * df.withColumn("sum", sum('id) over byCategoryOrderedById).show() - * - * +---+--------+---+ - * | id|category|sum| - * +---+--------+---+ - * | 1| b| 3| - * | 2| b| 5| - * | 3| b| 3| - * | 1| a| 4| - * | 1| a| 4| - * | 2| a| 2| - * +---+--------+---+ - * }}} - * - * @param start boundary start, inclusive. The frame is unbounded if the expression is - * [[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]]. - * @param end boundary end, inclusive. The frame is unbounded if the expression is - * [[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]]. + * This function has been deprecated in Spark 2.4. See SPARK-25842 for more information. * @since 2.3.0 */ + @deprecated("Use the version with Long parameter types", "2.4.0") def rangeBetween(start: Column, end: Column): WindowSpec = { spec.rangeBetween(start, end) } http://git-wip-us.apache.org/repos/asf/spark/blob/f37bcead/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala index 4c41aa3..0cc43a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala @@ -210,51 +210,10 @@ class WindowSpec private[sql]( } /** - * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). - * - * Both `start` and `end` are relative to the current row. For example, "lit(0)" means - * "current row", while "lit(-1)" means one off before the current row, and "lit(5)" means the - * five off after the current row. - * - * Users should use `unboundedPreceding()`, `unboundedFollowing()`, and `currentRow()` from - * [[org.apache.spark.sql.functions]] to specify special boundary values, literals are not - * transformed to [[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s. - * - * A range-based boundary is based on the actual value of the ORDER BY - * expression(s). An offset is used to alter the value of the ORDER BY expression, for - * instance if the current order by expression has a value of 10 and the lower bound offset - * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a - * number of constraints on the ORDER BY expressions: there can be only one expression and this - * expression must have a numerical/date/timestamp data type. An exception can be made when the - * offset is unbounded, because no value modification is needed, in this case multiple and - * non-numerical/date/timestamp data type ORDER BY expression are allowed. - * - * {{{ - * import org.apache.spark.sql.expressions.Window - * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) - * .toDF("id", "category") - * val byCategoryOrderedById = - * Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), lit(1)) - * df.withColumn("sum", sum('id) over byCategoryOrderedById).show() - * - * +---+--------+---+ - * | id|category|sum| - * +---+--------+---+ - * | 1| b| 3| - * | 2| b| 5| - * | 3| b| 3| - * | 1| a| 4| - * | 1| a| 4| - * | 2| a| 2| - * +---+--------+---+ - * }}} - * - * @param start boundary start, inclusive. The frame is unbounded if the expression is - * [[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]]. - * @param end boundary end, inclusive. The frame is unbounded if the expression is - * [[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]]. + * This function has been deprecated in Spark 2.4. See SPARK-25842 for more information. * @since 2.3.0 */ + @deprecated("Use the version with Long parameter types", "2.4.0") def rangeBetween(start: Column, end: Column): WindowSpec = { new WindowSpec( partitionSpec, http://git-wip-us.apache.org/repos/asf/spark/blob/f37bcead/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 5dedc9d..9c4ad48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -830,30 +830,30 @@ object functions { // Window functions ////////////////////////////////////////////////////////////////////////////////////////////// /** - * Window function: returns the special frame boundary that represents the first row in the - * window partition. + * This function has been deprecated in Spark 2.4. See SPARK-25842 for more information. * * @group window_funcs * @since 2.3.0 */ + @deprecated("Use Window.unboundedPreceding", "2.4.0") def unboundedPreceding(): Column = Column(UnboundedPreceding) /** - * Window function: returns the special frame boundary that represents the last row in the - * window partition. + * This function has been deprecated in Spark 2.4. See SPARK-25842 for more information. * * @group window_funcs * @since 2.3.0 */ + @deprecated("Use Window.unboundedFollowing", "2.4.0") def unboundedFollowing(): Column = Column(UnboundedFollowing) /** - * Window function: returns the special frame boundary that represents the current row in the - * window partition. + * This function has been deprecated in Spark 2.4. See SPARK-25842 for more information. * * @group window_funcs * @since 2.3.0 */ + @deprecated("Use Window.currentRow", "2.4.0") def currentRow(): Column = Column(CurrentRow) /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org