This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 7e0a5ef903c [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty 7e0a5ef903c is described below commit 7e0a5ef903c41eea3b0d1220bfabda2c8b8a5ac4 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Wed Aug 3 16:11:20 2022 +0900 [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty This PR proposes to apply the projection to respect the reordered columns in its child when group attributes are empty. To respect the column order in the child. Yes, it fixes a bug as below: ```python import pandas as pd from pyspark.sql import functions as f f.pandas_udf("double") def AVG(x: pd.Series) -> float: return x.mean() abc = spark.createDataFrame([(1.0, 5.0, 17.0)], schema=["a", "b", "c"]) abc.agg(AVG("a"), AVG("c")).show() abc.select("c", "a").agg(AVG("a"), AVG("c")).show() ``` **Before** ``` +------+------+ |AVG(a)|AVG(c)| +------+------+ | 17.0| 1.0| +------+------+ ``` **After** ``` +------+------+ |AVG(a)|AVG(c)| +------+------+ | 1.0| 17.0| +------+------+ ``` Manually tested, and added an unittest. Closes #37390 from HyukjinKwon/SPARK-39962. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 5335c784ae76c9cc0aaa7a4b57b3cd6b3891ad9a) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/execution/python/AggregateInPandasExec.scala | 5 +++-- .../apache/spark/sql/execution/python/PythonUDFSuite.scala | 13 +++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index dadf1129c34..791af2a6aee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -109,12 +109,13 @@ case class AggregateInPandasExec( inputRDD.mapPartitionsInternal { iter => if (iter.isEmpty) iter else { val prunedProj = UnsafeProjection.create(allInputs.toSeq, child.output) - val grouped = if (groupingExpressions.isEmpty) { + val groupedItr = if (groupingExpressions.isEmpty) { // Use an empty unsafe row as a place holder for the grouping key Iterator((new UnsafeRow(), iter)) } else { GroupedIterator(iter, groupingExpressions, child.output) - }.map { case (key, rows) => + } + val grouped = groupedItr.map { case (key, rows) => (key, rows.map(prunedProj)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala index 45b57207c57..4ad7f901053 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala @@ -71,4 +71,17 @@ class PythonUDFSuite extends QueryTest with SharedSparkSession { pythonTestUDF(count(pythonTestUDF(base("a") + 1)))) checkAnswer(df1, df2) } + + test("SPARK-39962: Global aggregation of Pandas UDF should respect the column order") { + assume(shouldTestGroupedAggPandasUDFs) + val df = Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("a", "b") + + val pandasTestUDF = TestGroupedAggPandasUDF(name = "pandas_udf") + val reorderedDf = df.select("b", "a") + val actual = reorderedDf.agg( + pandasTestUDF(reorderedDf("a")), pandasTestUDF(reorderedDf("b"))) + val expected = df.agg(pandasTestUDF(df("a")), pandasTestUDF(df("b"))) + + checkAnswer(actual, expected) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org