[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21082 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r194133638 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -424,6 +424,21 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + object Window extends Strategy { +def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalWindow( +WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, child) => +execution.window.WindowExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + + case PhysicalWindow( +WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, child) => +execution.python.WindowInPandasExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil --- End diff -- Added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r194133573 --- Diff: python/pyspark/worker.py --- @@ -128,6 +128,17 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(*series): +import pandas as pd +result = f(*series) +return pd.Series([result]).repeat(len(series[0])) --- End diff -- Added comments to describe the function --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r194130366 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala --- @@ -34,7 +34,12 @@ object PythonUDF { e.isInstanceOf[PythonUDF] && SCALAR_TYPES.contains(e.asInstanceOf[PythonUDF].evalType) } - def isGroupAggPandasUDF(e: Expression): Boolean = { + def isGroupedAggPandasUDF(e: Expression): Boolean = { +e.isInstanceOf[PythonUDF] && + e.asInstanceOf[PythonUDF].evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF + } + + def isWindowPandasUDF(e: Expression): Boolean = { --- End diff -- Sounds good --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r194129021 --- Diff: python/pyspark/sql/tests.py --- @@ -5181,6 +5190,235 @@ def test_invalid_args(self): 'mixture.*aggregate function.*group aggregate pandas UDF'): df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() + +@unittest.skipIf( +not _have_pandas or not _have_pyarrow, +_pandas_requirement_message or _pyarrow_requirement_message) +class WindowPandasUDFTests(ReusedSQLTestCase): +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def python_plus_one(self): +from pyspark.sql.functions import udf +return udf(lambda v: v + 1, 'double') + +@property +def pandas_scalar_time_two(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType +return pandas_udf(lambda v: v * 2, 'double') + +@property +def pandas_agg_mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def avg(v): +return v.mean() +return avg + +@property +def pandas_agg_max_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def max(v): +return v.max() +return max + +@property +def pandas_agg_min_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def min(v): +return v.min() +return min + +@property +def unbounded_window(self): +return Window.partitionBy('id') \ +.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + +@property +def ordered_window(self): +return Window.partitionBy('id').orderBy('v') + +@property +def unpartitioned_window(self): +return Window.partitionBy() --- End diff -- I think we can reply on that `Window.partitionBy()` returns unbounded window here, otherwise there might be too many combinations to test. But I am ok to add the tests for `Window,.partitionBy().rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)` in addition to the existing ones. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r193327794 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + +case class WindowInPandasExec( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = +child.output ++ windowExpression.map(_.toAttribute) + + override def requiredChildDistribution: Seq[Distribution] = { +if (partitionSpec.isEmpty) { + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") + AllTuples :: Nil +} else { + ClusteredDistribution(partitionSpec) :: Nil +} + } + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec) + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { +udf.children match { + case Seq(u: PythonUDF) => +val (chained, children) = collectFunctions(u) +(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) + case children => +// There should not be any other UDFs, or the children can't be evaluated directly. +assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) +(ChainedPythonFunctions(Seq(udf.func)), udf.children) +} + } + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map{ case (e, i) => --- End diff -- `p{` -> `p {` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r193326454 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -268,3 +269,40 @@ object PhysicalAggregation { case _ => None } } + +/** + * An extractor used when planning physical execution of a window. This extractor outputs + * the window function type of the logical window. + * + * The input logical window must contain same type of window functions, which is ensured by + * the rule ExtractWindowExpressions in the analyzer. + */ +object PhysicalWindow { + // windowFunctionType, windowExpression, partitionSpec, orderSpec, child + type ReturnType = --- End diff -- nit: `private type` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r193323738 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala --- @@ -297,6 +297,37 @@ trait WindowFunction extends Expression { def frame: WindowFrame = UnspecifiedFrame } +/** + * Case objects that describe whether a window function is a SQL window function or a Python + * user-defined window function. + */ +sealed trait WindowFunctionType + +object WindowFunctionType { + case object SQL extends WindowFunctionType + case object Python extends WindowFunctionType + + def functionType(windowExpression: NamedExpression): WindowFunctionType = { +val t = windowExpression.collectFirst { + case _: WindowFunction | _: AggregateFunction => SQL + case udf: PythonUDF if PythonUDF.isWindowPandasUDF(udf) => Python +} + +// Normally a window expression would either have either a SQL window function, a SQL --- End diff -- `either have either` ... :-). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r193323414 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -424,6 +424,21 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + object Window extends Strategy { +def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalWindow( +WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, child) => +execution.window.WindowExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + + case PhysicalWindow( +WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, child) => +execution.python.WindowInPandasExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil --- End diff -- tiny nit: I would add a newline below --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r193320743 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala --- @@ -34,7 +34,12 @@ object PythonUDF { e.isInstanceOf[PythonUDF] && SCALAR_TYPES.contains(e.asInstanceOf[PythonUDF].evalType) } - def isGroupAggPandasUDF(e: Expression): Boolean = { + def isGroupedAggPandasUDF(e: Expression): Boolean = { +e.isInstanceOf[PythonUDF] && + e.asInstanceOf[PythonUDF].evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF + } + + def isWindowPandasUDF(e: Expression): Boolean = { --- End diff -- @icexelloss, can we maybe do: ``` def isWindowPandasUDF(e: Expression): = isGroupedAggPandasUDF(e) ``` and explain why they can be same .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r192182678 --- Diff: python/pyspark/worker.py --- @@ -128,6 +128,17 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(*series): +import pandas as pd +result = f(*series) +return pd.Series([result]).repeat(len(series[0])) --- End diff -- Yes - I tried to do this on the Java side but it's tricky and complicated to merging the input row and output of udf if they are not 1-1 mapping. So I ended up doing this.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r192151282 --- Diff: python/pyspark/worker.py --- @@ -128,6 +128,17 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(*series): +import pandas as pd +result = f(*series) +return pd.Series([result]).repeat(len(series[0])) --- End diff -- So, this place is the only place where it's diverted (by repeating?); therefore, needs Windows specific attribute? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r192150984 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala --- @@ -34,7 +34,12 @@ object PythonUDF { e.isInstanceOf[PythonUDF] && SCALAR_TYPES.contains(e.asInstanceOf[PythonUDF].evalType) } - def isGroupAggPandasUDF(e: Expression): Boolean = { + def isGroupedAggPandasUDF(e: Expression): Boolean = { +e.isInstanceOf[PythonUDF] && + e.asInstanceOf[PythonUDF].evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF + } + + def isWindowPandasUDF(e: Expression): Boolean = { --- End diff -- H .. this bit looks a bit odd because the condition is the same but the only the name is different. We should at least need to leave a comment ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r192146812 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -112,12 +113,19 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis("An offset window function can only be evaluated in an ordered " + s"row-based window frame with a single offset: $w") + case _ @ WindowExpression(_: PythonUDF, +WindowSpecDefinition(_, _, frame: SpecifiedWindowFrame)) + if !frame.isUnbounded => +failAnalysis(s"Only unbounded window frame is supported with Pandas UDFs.") --- End diff -- leading `s` seems not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r192146449 --- Diff: python/pyspark/worker.py --- @@ -128,6 +128,17 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(*series): +import pandas as pd +result = f(*series) +return pd.Series([result]).repeat(len(series[0])) --- End diff -- Let's leave a short comment where we are here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r192142018 --- Diff: python/pyspark/sql/tests.py --- @@ -5181,6 +5190,235 @@ def test_invalid_args(self): 'mixture.*aggregate function.*group aggregate pandas UDF'): df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() + +@unittest.skipIf( +not _have_pandas or not _have_pyarrow, +_pandas_requirement_message or _pyarrow_requirement_message) +class WindowPandasUDFTests(ReusedSQLTestCase): +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def python_plus_one(self): +from pyspark.sql.functions import udf +return udf(lambda v: v + 1, 'double') + +@property +def pandas_scalar_time_two(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType +return pandas_udf(lambda v: v * 2, 'double') + +@property +def pandas_agg_mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def avg(v): +return v.mean() +return avg + +@property +def pandas_agg_max_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def max(v): +return v.max() +return max + +@property +def pandas_agg_min_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def min(v): +return v.min() +return min + +@property +def unbounded_window(self): +return Window.partitionBy('id') \ +.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + +@property +def ordered_window(self): +return Window.partitionBy('id').orderBy('v') + +@property +def unpartitioned_window(self): +return Window.partitionBy() --- End diff -- Shall we test `Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)` too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r192140997 --- Diff: python/pyspark/sql/tests.py --- @@ -5181,6 +5190,235 @@ def test_invalid_args(self): 'mixture.*aggregate function.*group aggregate pandas UDF'): df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() + +@unittest.skipIf( +not _have_pandas or not _have_pyarrow, +_pandas_requirement_message or _pyarrow_requirement_message) +class WindowPandasUDFTests(ReusedSQLTestCase): +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def python_plus_one(self): +from pyspark.sql.functions import udf +return udf(lambda v: v + 1, 'double') + +@property +def pandas_scalar_time_two(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType +return pandas_udf(lambda v: v * 2, 'double') + +@property +def pandas_agg_mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def avg(v): +return v.mean() +return avg + +@property +def pandas_agg_max_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def max(v): +return v.max() +return max + +@property +def pandas_agg_min_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def min(v): +return v.min() +return min + +@property +def unbounded_window(self): +return Window.partitionBy('id') \ +.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + +@property +def ordered_window(self): +return Window.partitionBy('id').orderBy('v') + +@property +def unpartitioned_window(self): +return Window.partitionBy() + +def test_simple(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType, percent_rank, mean, max + +df = self.data +w = self.unbounded_window + +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('mean_v', mean_udf(df['v']).over(w)) +expected1 = df.withColumn('mean_v', mean(df['v']).over(w)) + +result2 = df.select(mean_udf(df['v']).over(w)) +expected2 = df.select(mean(df['v']).over(w)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) +self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) + +def test_multiple_udfs(self): +from pyspark.sql.functions import max, min, mean + +df = self.data +w = self.unbounded_window + +result1 = df.withColumn('mean_v', self.pandas_agg_mean_udf(df['v']).over(w)) \ +.withColumn('max_v', self.pandas_agg_max_udf(df['v']).over(w)) \ +.withColumn('min_w', self.pandas_agg_min_udf(df['w']).over(w)) \ --- End diff -- Trailing `\`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r192138739 --- Diff: python/pyspark/sql/functions.py --- @@ -2321,7 +2323,30 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 2|6.0| +---+---+ - .. seealso:: :meth:`pyspark.sql.GroupedData.agg` + This example shows using grouped aggregated UDFs as window functions. Note that only + unbounded window frame is supported at the moment: + + >>> from pyspark.sql.functions import pandas_udf, PandasUDFType + >>> from pyspark.sql import Window + >>> df = spark.createDataFrame( + ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], + ... ("id", "v")) + >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP + ... def mean_udf(v): + ... return v.mean() + >>> w = Window.partitionBy('id') --- End diff -- Shall we explicitly show unbounded boundaries? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r191801865 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + +case class WindowInPandasExec( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = --- End diff -- I am not sure. Currently python and window are under different package namespaces (execution.python and execution.window). To create a common base class we probably need to refactor the namespace hierarchy somehow and remove some of the `private[python]` and `private[window]`. I think we will face the problem when trying to do rolling window with pandas UDF, because we likely want to reuse some of code under `execution.window` package, but I am not sure we should resolve the python and window namespace in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r191740985 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + +case class WindowInPandasExec( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = --- End diff -- Minor comment: should we create a common base class for native Windows and python Windows and ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r190633480 --- Diff: python/pyspark/sql/functions.py --- @@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, functionType=None): The returned scalar can be either a python primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`. - :class:`ArrayType`, :class:`MapType` and :class:`StructType` are currently not supported as - output types. + :class:`MapType` and :class:`StructType` are currently not supported as output types. --- End diff -- Track in: https://issues.apache.org/jira/browse/SPARK-23633 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r190340070 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1869,6 +1870,8 @@ class Analyzer( case window: WindowExpression => window.windowSpec }.distinct +val windowFunctionType = WindowFunctionType.functionType(expr) --- End diff -- Oh, now I understand that the long block is for `groupBy`, so the tuple will be the group key, right? I misread the block was for `map` or something. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r190334520 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1869,6 +1870,8 @@ class Analyzer( case window: WindowExpression => window.windowSpec }.distinct +val windowFunctionType = WindowFunctionType.functionType(expr) --- End diff -- Inlined. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r190242582 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1869,6 +1870,8 @@ class Analyzer( case window: WindowExpression => window.windowSpec }.distinct +val windowFunctionType = WindowFunctionType.functionType(expr) --- End diff -- Ah I see. Let me inline this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r190177055 --- Diff: python/pyspark/sql/tests.py --- @@ -5181,6 +5190,235 @@ def test_invalid_args(self): 'mixture.*aggregate function.*group aggregate pandas UDF'): df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() + +@unittest.skipIf( +not _have_pandas or not _have_pyarrow, +_pandas_requirement_message or _pyarrow_requirement_message) +class WindowPandasUDFTests(ReusedSQLTestCase): +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def python_plus_one(self): --- End diff -- Sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r190176898 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1869,6 +1870,8 @@ class Analyzer( case window: WindowExpression => window.windowSpec }.distinct +val windowFunctionType = WindowFunctionType.functionType(expr) --- End diff -- Yeah, I can see it is included in the tuple, but seems like it is used for nothing after that and dropped at https://github.com/apache/spark/pull/21082/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R1886? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r190065854 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -268,3 +269,38 @@ object PhysicalAggregation { case _ => None } } + +/** + * An extractor used when planning physical execution of a window. This extractor outputs + * the window function type of the logical window. + * + * The input logical window must contain same type of window functions, which is ensured by + * the rule ExtractWindowExpressions in the analyzer. + */ +object PhysicalWindow { + // windowFunctionType, windowExpression, partitionSpec, orderSpec, child + type ReturnType = +(WindowFunctionType, Seq[NamedExpression], Seq[Expression], Seq[SortOrder], LogicalPlan) + + def unapply(a: Any): Option[ReturnType] = a match { +case expr @ logical.Window(windowExpressions, partitionSpec, orderSpec, child) => + + if (windowExpressions.isEmpty) { +throw new AnalysisException(s"Window expression is empty in $expr") --- End diff -- I added comments to explain this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r190061061 --- Diff: python/pyspark/sql/tests.py --- @@ -5181,6 +5190,235 @@ def test_invalid_args(self): 'mixture.*aggregate function.*group aggregate pandas UDF'): df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() + +@unittest.skipIf( +not _have_pandas or not _have_pyarrow, +_pandas_requirement_message or _pyarrow_requirement_message) +class WindowPandasUDFTests(ReusedSQLTestCase): +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def python_plus_one(self): --- End diff -- I agree those should be shared, but let's do it maybe in a separate PR? Because the refactor will probably touch many test cases and the PR is quite large already.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r190060687 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -268,3 +269,38 @@ object PhysicalAggregation { case _ => None } } + +/** + * An extractor used when planning physical execution of a window. This extractor outputs + * the window function type of the logical window. + * + * The input logical window must contain same type of window functions, which is ensured by + * the rule ExtractWindowExpressions in the analyzer. + */ +object PhysicalWindow { + // windowFunctionType, windowExpression, partitionSpec, orderSpec, child + type ReturnType = +(WindowFunctionType, Seq[NamedExpression], Seq[Expression], Seq[SortOrder], LogicalPlan) + + def unapply(a: Any): Option[ReturnType] = a match { +case expr @ logical.Window(windowExpressions, partitionSpec, orderSpec, child) => + + if (windowExpressions.isEmpty) { +throw new AnalysisException(s"Window expression is empty in $expr") --- End diff -- Window expression should not be empty here so this should not be reached. This is just for safety. Let me add a comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r190060086 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1869,6 +1870,8 @@ class Analyzer( case window: WindowExpression => window.windowSpec }.distinct +val windowFunctionType = WindowFunctionType.functionType(expr) --- End diff -- This is used for grouping window functions into sql window functions and pandas UDFs and create different logical node for them. The value is used here https://github.com/apache/spark/pull/21082/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R1886 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r189198335 --- Diff: python/pyspark/sql/tests.py --- @@ -5181,6 +5190,235 @@ def test_invalid_args(self): 'mixture.*aggregate function.*group aggregate pandas UDF'): df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() + +@unittest.skipIf( +not _have_pandas or not _have_pyarrow, +_pandas_requirement_message or _pyarrow_requirement_message) +class WindowPandasUDFTests(ReusedSQLTestCase): +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def python_plus_one(self): --- End diff -- Shall we move `pands_udf`s for tests to the common place? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r189161794 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1869,6 +1870,8 @@ class Analyzer( case window: WindowExpression => window.windowSpec }.distinct +val windowFunctionType = WindowFunctionType.functionType(expr) --- End diff -- What's this for? Seems like this is omitted soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r189216703 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -268,3 +269,38 @@ object PhysicalAggregation { case _ => None } } + +/** + * An extractor used when planning physical execution of a window. This extractor outputs + * the window function type of the logical window. + * + * The input logical window must contain same type of window functions, which is ensured by + * the rule ExtractWindowExpressions in the analyzer. + */ +object PhysicalWindow { + // windowFunctionType, windowExpression, partitionSpec, orderSpec, child + type ReturnType = +(WindowFunctionType, Seq[NamedExpression], Seq[Expression], Seq[SortOrder], LogicalPlan) + + def unapply(a: Any): Option[ReturnType] = a match { +case expr @ logical.Window(windowExpressions, partitionSpec, orderSpec, child) => + + if (windowExpressions.isEmpty) { +throw new AnalysisException(s"Window expression is empty in $expr") --- End diff -- Can we reach here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r187663449 --- Diff: python/pyspark/worker.py --- @@ -128,6 +128,17 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(*series): +import pandas as pd +result = f(*series) +return pd.Series([result]).repeat(len(series[0])) --- End diff -- window aggregation results are broadcasted to each input row and therefore we repeat the value here to match the input rows. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r187479626 --- Diff: python/pyspark/worker.py --- @@ -128,6 +128,17 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(*series): +import pandas as pd +result = f(*series) +return pd.Series([result]).repeat(len(series[0])) --- End diff -- Just wondering why this needs to be repeated to the length of the series and grouped agg doesn't? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r185423199 --- Diff: python/pyspark/sql/functions.py --- @@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, functionType=None): The returned scalar can be either a python primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`. - :class:`ArrayType`, :class:`MapType` and :class:`StructType` are currently not supported as - output types. + :class:`MapType` and :class:`StructType` are currently not supported as output types. --- End diff -- Yup, I think that works too. I left a comment only because it looked mismatched with this api doc and the sql programming guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r185339976 --- Diff: python/pyspark/sql/functions.py --- @@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, functionType=None): The returned scalar can be either a python primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`. - :class:`ArrayType`, :class:`MapType` and :class:`StructType` are currently not supported as - output types. + :class:`MapType` and :class:`StructType` are currently not supported as output types. --- End diff -- I am leaning towards keeping this in the API doc and maybe make sql-programming-guide link to this. I think most user would look for API docs first rather than sql-programming-guide, so it's probably a bit more convenient to have it here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r184875393 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -268,3 +269,38 @@ object PhysicalAggregation { case _ => None } } + +/** + * An extractor used when planning physical execution of a window. This extractor outputs + * the window function type of the logical window. + * + * The input logical window must contain same type of window functions, which is ensured by + * the rule ExtractWindowExpressions in the analyzer. + */ +object PhysicalWindow { + // windowFunctionType, windowExpression, partitionSpec, orderSpec, child + type ReturnType = +(WindowFunctionType, Seq[NamedExpression], Seq[Expression], Seq[SortOrder], LogicalPlan) + + def unapply(a: Any): Option[ReturnType] = a match { +case expr @ logical.Window(windowExpressions, partitionSpec, orderSpec, child) => + + if (windowExpressions.isEmpty) { +throw new AnalysisException(s"Window expression is empty in $expr") + } + + val windowFunctionType = windowExpressions.map(WindowFunctionType.functionType) +.reduceLeft ( (t1: WindowFunctionType, t2: WindowFunctionType) => --- End diff -- (BTW: ``` .reduceLeft { ... } ``` ) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r184875163 --- Diff: python/pyspark/sql/tests.py --- @@ -5181,6 +5190,236 @@ def test_invalid_args(self): 'mixture.*aggregate function.*group aggregate pandas UDF'): df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() + +@unittest.skipIf( +not _have_pandas or not _have_pyarrow, +_pandas_requirement_message or _pyarrow_requirement_message) +class WindowPandasUDFTests(ReusedSQLTestCase): +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))) \ +.drop('vs') \ +.withColumn('w', lit(1.0)) + +@property +def python_plus_one(self): +from pyspark.sql.functions import udf +return udf(lambda v: v + 1, 'double') + +@property +def pandas_scalar_time_two(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType +return pandas_udf(lambda v: v * 2, 'double') + +@property +def pandas_agg_mean_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def avg(v): +return v.mean() +return avg + +@property +def pandas_agg_max_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def max(v): +return v.max() +return max + +@property +def pandas_agg_min_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('double', PandasUDFType.GROUPED_AGG) +def min(v): +return v.min() +return min + +@property +def unbounded_window(self): +return Window.partitionBy('id') \ +.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + +@property +def ordered_window(self): +return Window.partitionBy('id').orderBy('v') + +@property +def unpartitioned_window(self): +return Window.partitionBy() + +def test_simple(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType, percent_rank, mean, max + +df = self.data +w = self.unbounded_window + +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('mean_v', mean_udf(df['v']).over(w)) +expected1 = df.withColumn('mean_v', mean(df['v']).over(w)) + +result2 = df.select(mean_udf(df['v']).over(w)) +expected2 = df.select(mean(df['v']).over(w)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) +self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) + +def test_multiple_udfs(self): +from pyspark.sql.functions import max, min, mean + +df = self.data +w = self.unbounded_window + +result1 = df.withColumn('mean_v', self.pandas_agg_mean_udf(df['v']).over(w)) \ +.withColumn('max_v', self.pandas_agg_max_udf(df['v']).over(w)) \ +.withColumn('min_w', self.pandas_agg_min_udf(df['w']).over(w)) \ + +expected1 = df.withColumn('mean_v', mean(df['v']).over(w)) \ + .withColumn('max_v', max(df['v']).over(w)) \ + .withColumn('min_w', min(df['w']).over(w)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_replace_existing(self): +from pyspark.sql.functions import mean + +df = self.data +w = self.unbounded_window + +result1 = df.withColumn('v', self.pandas_agg_mean_udf(df['v']).over(w)) +expected1 = df.withColumn('v', mean(df['v']).over(w)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_mixed_sql(self): +from pyspark.sql.functions import mean + +df = self.data +w = self.unbounded_window +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('v', mean_udf(df['v'] * 2).over(w) + 1) +expected1 = df.withColumn('v', mean(df['v'] * 2).over(w) + 1) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) +
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r184875140 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + +case class WindowInPandasExec( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = +child.output ++ windowExpression.map(_.toAttribute) + + override def requiredChildDistribution: Seq[Distribution] = { +if (partitionSpec.isEmpty) { + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") + AllTuples :: Nil +} else ClusteredDistribution(partitionSpec) :: Nil + } + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec) + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { +udf.children match { + case Seq(u: PythonUDF) => +val (chained, children) = collectFunctions(u) +(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) + case children => +// There should not be any other UDFs, or the children can't be evaluated directly. +assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) +(ChainedPythonFunctions(Seq(udf.func)), udf.children) +} + } + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map{ case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) +} +val unboundToRefMap = expressions.zip(references).toMap +val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) +UnsafeProjection.create( + child.output ++ patchedWindowExpression, + child.output) + } + + protected override def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute() + +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val sessionLocalTimeZone = conf.sessionLocalTimeZone +val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone + +// Extract window expressions and window
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r184875120 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + +case class WindowInPandasExec( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = +child.output ++ windowExpression.map(_.toAttribute) + + override def requiredChildDistribution: Seq[Distribution] = { +if (partitionSpec.isEmpty) { + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") + AllTuples :: Nil +} else ClusteredDistribution(partitionSpec) :: Nil + } + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec) + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { +udf.children match { + case Seq(u: PythonUDF) => +val (chained, children) = collectFunctions(u) +(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) + case children => +// There should not be any other UDFs, or the children can't be evaluated directly. +assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) +(ChainedPythonFunctions(Seq(udf.func)), udf.children) +} + } + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map{ case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) +} +val unboundToRefMap = expressions.zip(references).toMap +val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) +UnsafeProjection.create( + child.output ++ patchedWindowExpression, + child.output) + } + + protected override def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute() + +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val sessionLocalTimeZone = conf.sessionLocalTimeZone +val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone + +// Extract window expressions and window
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r184875102 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + +case class WindowInPandasExec( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = +child.output ++ windowExpression.map(_.toAttribute) + + override def requiredChildDistribution: Seq[Distribution] = { +if (partitionSpec.isEmpty) { + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") + AllTuples :: Nil +} else ClusteredDistribution(partitionSpec) :: Nil + } + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec) + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { +udf.children match { + case Seq(u: PythonUDF) => +val (chained, children) = collectFunctions(u) +(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) + case children => +// There should not be any other UDFs, or the children can't be evaluated directly. +assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) +(ChainedPythonFunctions(Seq(udf.func)), udf.children) +} + } + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map{ case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) +} +val unboundToRefMap = expressions.zip(references).toMap +val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) +UnsafeProjection.create( + child.output ++ patchedWindowExpression, + child.output) + } + + protected override def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute() + +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val sessionLocalTimeZone = conf.sessionLocalTimeZone +val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone + +// Extract window expressions and window
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r184875086 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + +case class WindowInPandasExec( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = +child.output ++ windowExpression.map(_.toAttribute) + + override def requiredChildDistribution: Seq[Distribution] = { +if (partitionSpec.isEmpty) { + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") + AllTuples :: Nil +} else ClusteredDistribution(partitionSpec) :: Nil --- End diff -- nit: I would do ``` else { } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r184875053 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -424,6 +424,21 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + object Window extends Strategy { +def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalWindow( + WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, child) => --- End diff -- nit: indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r184875039 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -624,7 +624,9 @@ object CollapseRepartition extends Rule[LogicalPlan] { object CollapseWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) -if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty => +if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty + && WindowFunctionType.functionType(we1.head) == --- End diff -- nit: indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r184875000 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -112,12 +113,19 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis("An offset window function can only be evaluated in an ordered " + s"row-based window frame with a single offset: $w") + case w @ WindowExpression(_: PythonUDF, + WindowSpecDefinition(_, _, frame: SpecifiedWindowFrame)) --- End diff -- indentation :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r184874826 --- Diff: python/pyspark/sql/functions.py --- @@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, functionType=None): The returned scalar can be either a python primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`. - :class:`ArrayType`, :class:`MapType` and :class:`StructType` are currently not supported as - output types. + :class:`MapType` and :class:`StructType` are currently not supported as output types. --- End diff -- @icexelloss, actually should we keep this note? I think this is matched with https://spark.apache.org/docs/latest/sql-programming-guide.html#supported-sql-types which we documented there and SQLConf. Probably, just leaving a link could be fine. Removing out is okay to me too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r183769433 --- Diff: python/pyspark/sql/functions.py --- @@ -2321,7 +2323,30 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 2|6.0| +---+---+ - .. seealso:: :meth:`pyspark.sql.GroupedData.agg` + This example shows using grouped aggregated UDFs as window functions. Note that only + unbounded window frame is supported at the moment: + + >>> from pyspark.sql.functions import pandas_udf, PandasUDFType + >>> from pyspark.sql import Window + >>> df = spark.createDataFrame( + ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], + ... ("id", "v")) + >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP --- End diff -- Yes exactly. The idea is that the producer of the UDF can produce a grouped agg udf, such as weighted mean, and the consumer can use the UDF in both groupby and window, similar to how SQL aggregation function work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r183768758 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -268,3 +269,38 @@ object PhysicalAggregation { case _ => None } } + +/** + * An extractor used when planning physical execution of a window. This extractor outputs + * the window function type of the logical window. + * + * The input logical window must contain same type of window functions, which is ensured by + * the rule ExtractWindowExpressions in the analyzer. + */ +object PhysicalWindow { + // windowFunctionType, windowExpression, partitionSpec, orderSpec, child + type ReturnType = +(WindowFunctionType, Seq[NamedExpression], Seq[Expression], Seq[SortOrder], LogicalPlan) + + def unapply(a: Any): Option[ReturnType] = a match { +case expr @ logical.Window(windowExpressions, partitionSpec, orderSpec, child) => + + if (windowExpressions.isEmpty) { +throw new AnalysisException(s"Window expression is empty in $expr") + } + + val windowFunctionType = windowExpressions.map(WindowFunctionType.functionType) +.reduceLeft ( (t1: WindowFunctionType, t2: WindowFunctionType) => --- End diff -- If we want to do this in Analyzer, then we would carry the WindowFunctionType in the logical plan. I did it this way to avoid changing the logical node. I am open to add WindowFunctionType to the logical plan though. What do other people think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r183572930 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -268,3 +269,38 @@ object PhysicalAggregation { case _ => None } } + +/** + * An extractor used when planning physical execution of a window. This extractor outputs + * the window function type of the logical window. + * + * The input logical window must contain same type of window functions, which is ensured by + * the rule ExtractWindowExpressions in the analyzer. + */ +object PhysicalWindow { + // windowFunctionType, windowExpression, partitionSpec, orderSpec, child + type ReturnType = +(WindowFunctionType, Seq[NamedExpression], Seq[Expression], Seq[SortOrder], LogicalPlan) + + def unapply(a: Any): Option[ReturnType] = a match { +case expr @ logical.Window(windowExpressions, partitionSpec, orderSpec, child) => + + if (windowExpressions.isEmpty) { +throw new AnalysisException(s"Window expression is empty in $expr") + } + + val windowFunctionType = windowExpressions.map(WindowFunctionType.functionType) +.reduceLeft ( (t1: WindowFunctionType, t2: WindowFunctionType) => --- End diff -- Should we do this analysis check in Analyzer? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r183416353 --- Diff: python/pyspark/sql/functions.py --- @@ -2321,7 +2323,30 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 2|6.0| +---+---+ - .. seealso:: :meth:`pyspark.sql.GroupedData.agg` + This example shows using grouped aggregated UDFs as window functions. Note that only + unbounded window frame is supported at the moment: + + >>> from pyspark.sql.functions import pandas_udf, PandasUDFType + >>> from pyspark.sql import Window + >>> df = spark.createDataFrame( + ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], + ... ("id", "v")) + >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP --- End diff -- So we don't have `PandasUDFType.WINDOW_AGG` and a pandas udf defined as `PandasUDFType.GROUPED_AGG` can be both used with `groupby` and `Window`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r183417848 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala --- @@ -34,10 +34,15 @@ object PythonUDF { e.isInstanceOf[PythonUDF] && SCALAR_TYPES.contains(e.asInstanceOf[PythonUDF].evalType) } - def isGroupAggPandasUDF(e: Expression): Boolean = { + def isGroupedAggPandasUDF(e: Expression): Boolean = { e.isInstanceOf[PythonUDF] && e.asInstanceOf[PythonUDF].evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF } + + def isWindowPandasUDF(e: Expression): Boolean = { +e.isInstanceOf[PythonUDF] && +e.asInstanceOf[PythonUDF].evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF --- End diff -- nit: indent style. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r183412509 --- Diff: python/pyspark/sql/functions.py --- @@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, functionType=None): The returned scalar can be either a python primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`. - :class:`ArrayType`, :class:`MapType` and :class:`StructType` are currently not supported as - output types. + :class:`MapType` and :class:`StructType` are currently not supported as output types. - Group aggregate UDFs are used with :meth:`pyspark.sql.GroupedData.agg` + Group aggregate UDFs are used with :meth:`pyspark.sql.GroupedData.agg` and + :meth:`pyspark.sql.Window` + + This example show using grouped aggregated UDFS with groupby: --- End diff -- typo: `shows`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r183412270 --- Diff: python/pyspark/sql/functions.py --- @@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, functionType=None): The returned scalar can be either a python primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`. - :class:`ArrayType`, :class:`MapType` and :class:`StructType` are currently not supported as - output types. + :class:`MapType` and :class:`StructType` are currently not supported as output types. - Group aggregate UDFs are used with :meth:`pyspark.sql.GroupedData.agg` + Group aggregate UDFs are used with :meth:`pyspark.sql.GroupedData.agg` and + :meth:`pyspark.sql.Window` --- End diff -- ``` :class:`pyspark.sql.Window` ```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r183220435 --- Diff: python/pyspark/sql/tests.py --- @@ -5156,6 +5156,15 @@ def test_retain_group_columns(self): expected1 = df.groupby(df.id).agg(sum(df.v)) self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) +def test_array_type(self): --- End diff -- This is related, but I figured its shouldn't hurt to add an array test in GroupedAggPandasUDFTests.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21082#discussion_r183220392 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala --- @@ -149,7 +149,7 @@ class AnalysisErrorSuite extends AnalysisTest { UnresolvedAttribute("a") :: Nil, SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil, UnspecifiedFrame)).as('window)), -"not supported within a window function" :: Nil) +"does not have any window functions" :: Nil) --- End diff -- This is because an early analysis exception is thrown by rule ExtractWindowExpressions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org