This is an automated email from the ASF dual-hosted git repository. cutlerb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 00d06ca [SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs 00d06ca is described below commit 00d06cad564d5e3e5f78a687776d02fe0695a861 Author: HyukjinKwon <gurwls...@apache.org> AuthorDate: Wed Jun 10 15:54:07 2020 -0700 [SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs ### What changes were proposed in this pull request? This is another approach to fix the issue. See the previous try https://github.com/apache/spark/pull/28745. It was too invasive so I took more conservative approach. This PR proposes to resolve grouping attributes separately first so it can be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` are resolved without ambiguity. Previously, ```python from pyspark.sql.functions import * df = spark.createDataFrame([[1, 1]], ["column", "Score"]) pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP) def my_pandas_udf(pdf): return pdf.assign(Score=0.5) df.groupby('COLUMN').apply(my_pandas_udf).show() ``` was failed as below: ``` pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;" ``` because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know which reference to take from the child projection. After this fix, it resolves the child projection first with grouping keys and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the child projection that is positionally selected. ### Why are the changes needed? To resolve grouping keys correctly. ### Does this PR introduce _any_ user-facing change? Yes, ```python from pyspark.sql.functions import * df = spark.createDataFrame([[1, 1]], ["column", "Score"]) pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP) def my_pandas_udf(pdf): return pdf.assign(Score=0.5) df.groupby('COLUMN').apply(my_pandas_udf).show() ``` ```python df1 = spark.createDataFrame([(1, 1)], ("column", "value")) df2 = spark.createDataFrame([(1, 1)], ("column", "value")) df1.groupby("COLUMN").cogroup( df2.groupby("COLUMN") ).applyInPandas(lambda r, l: r + l, df1.schema).show() ``` Before: ``` pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.; ``` ``` pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];; 'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L] :- Project [COLUMN#9L, column#9L, value#10L] : +- LogicalRDD [column#9L, value#10L], false +- Project [COLUMN#13L, column#13L, value#14L] +- LogicalRDD [column#13L, value#14L], false ``` After: ``` +------+-----+ |column|Score| +------+-----+ | 1| 0.5| +------+-----+ ``` ``` +------+-----+ |column|value| +------+-----+ | 2| 2| +------+-----+ ``` ### How was this patch tested? Unittests were added and manually tested. Closes #28777 from HyukjinKwon/SPARK-31915-another. Authored-by: HyukjinKwon <gurwls...@apache.org> Signed-off-by: Bryan Cutler <cutl...@gmail.com> --- python/pyspark/sql/tests/test_pandas_cogrouped_map.py | 18 +++++++++++++++++- python/pyspark/sql/tests/test_pandas_grouped_map.py | 10 ++++++++++ .../apache/spark/sql/RelationalGroupedDataset.scala | 17 ++++++++++------- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py index 3ed9d2a..c1cb30c 100644 --- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py @@ -19,7 +19,7 @@ import unittest import sys from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType -from pyspark.sql.types import DoubleType, StructType, StructField +from pyspark.sql.types import DoubleType, StructType, StructField, Row from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \ pandas_requirement_message, pyarrow_requirement_message from pyspark.testing.utils import QuietTest @@ -193,6 +193,22 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase): left.groupby('id').cogroup(right.groupby('id')) \ .applyInPandas(lambda: 1, StructType([StructField("d", DoubleType())])) + def test_case_insensitive_grouping_column(self): + # SPARK-31915: case-insensitive grouping column should work. + df1 = self.spark.createDataFrame([(1, 1)], ("column", "value")) + + row = df1.groupby("ColUmn").cogroup( + df1.groupby("COLUMN") + ).applyInPandas(lambda r, l: r + l, "column long, value long").first() + self.assertEquals(row.asDict(), Row(column=2, value=2).asDict()) + + df2 = self.spark.createDataFrame([(1, 1)], ("column", "value")) + + row = df1.groupby("ColUmn").cogroup( + df2.groupby("COLUMN") + ).applyInPandas(lambda r, l: r + l, "column long, value long").first() + self.assertEquals(row.asDict(), Row(column=2, value=2).asDict()) + @staticmethod def _test_with_key(left, right, isLeft): diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py b/python/pyspark/sql/tests/test_pandas_grouped_map.py index ff53a0c..7611943 100644 --- a/python/pyspark/sql/tests/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py @@ -587,6 +587,16 @@ class GroupedMapInPandasTests(ReusedSQLTestCase): # Check that all group and window_range values from udf matched expected self.assertTrue(all([r[0] for r in result])) + def test_case_insensitive_grouping_column(self): + # SPARK-31915: case-insensitive grouping column should work. + def my_pandas_udf(pdf): + return pdf.assign(score=0.5) + + df = self.spark.createDataFrame([[1, 1]], ["column", "score"]) + row = df.groupby('COLUMN').applyInPandas( + my_pandas_udf, schema="column integer, score float").first() + self.assertEquals(row.asDict(), Row(column=1, score=0.5).asDict()) + if __name__ == "__main__": from pyspark.sql.tests.test_pandas_grouped_map import * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index b1ba7d4..c37d8ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -546,9 +546,10 @@ class RelationalGroupedDataset protected[sql]( case ne: NamedExpression => ne case other => Alias(other, other.toString)() } - val groupingAttributes = groupingNamedExpressions.map(_.toAttribute) val child = df.logicalPlan - val project = Project(groupingNamedExpressions ++ child.output, child) + val project = df.sparkSession.sessionState.executePlan( + Project(groupingNamedExpressions ++ child.output, child)).analyzed + val groupingAttributes = project.output.take(groupingNamedExpressions.length) val output = expr.dataType.asInstanceOf[StructType].toAttributes val plan = FlatMapGroupsInPandas(groupingAttributes, expr, output, project) @@ -583,14 +584,16 @@ class RelationalGroupedDataset protected[sql]( case other => Alias(other, other.toString)() } - val leftAttributes = leftGroupingNamedExpressions.map(_.toAttribute) - val rightAttributes = rightGroupingNamedExpressions.map(_.toAttribute) - val leftChild = df.logicalPlan val rightChild = r.df.logicalPlan - val left = Project(leftGroupingNamedExpressions ++ leftChild.output, leftChild) - val right = Project(rightGroupingNamedExpressions ++ rightChild.output, rightChild) + val left = df.sparkSession.sessionState.executePlan( + Project(leftGroupingNamedExpressions ++ leftChild.output, leftChild)).analyzed + val right = r.df.sparkSession.sessionState.executePlan( + Project(rightGroupingNamedExpressions ++ rightChild.output, rightChild)).analyzed + + val leftAttributes = left.output.take(leftGroupingNamedExpressions.length) + val rightAttributes = right.output.take(rightGroupingNamedExpressions.length) val output = expr.dataType.asInstanceOf[StructType].toAttributes val plan = FlatMapCoGroupsInPandas(leftAttributes, rightAttributes, expr, output, left, right) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org