This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new c44020b961f [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return empty DataFrame without columns c44020b961f is described below commit c44020b961ffe44e30ee617af6ffb84effbd28fe Author: Enrico Minack <git...@enrico.minack.dev> AuthorDate: Wed Apr 13 17:07:27 2022 +0900 [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return empty DataFrame without columns ### What changes were proposed in this pull request? Methods `wrap_cogrouped_map_pandas_udf` and `wrap_grouped_map_pandas_udf` in `python/pyspark/worker.py` do not need to reject `pd.DataFrame`s with no columns return by udf when that DataFrame is empty (zero rows). This allows to return empty DataFrames without the need to define columns. The DataFrame is empty after all! **The proposed behaviour is consistent with the current behaviour of `DataFrame.mapInPandas`.** ### Why are the changes needed? Returning an empty DataFrame from the lambda given to `applyInPandas` should be as easy as this: ```python return pd.DataFrame([]) ``` However, PySpark requires that empty DataFrame to have the right _number_ of columns. This seems redundant as the schema is already defined in the `applyInPandas` call. Returning a non-empty DataFrame does not require defining columns. Behaviour of `applyInPandas` should be consistent with `mapInPandas`. Here is an example to reproduce: ```python import pandas as pd from pyspark.sql.functions import pandas_udf, ceil df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) def mean_func(key, pdf): if key == (1,): return pd.DataFrame([]) else: return pd.DataFrame([key + (pdf.v.mean(),)]) df.groupby("id").applyInPandas(mean_func, schema="id long, v double").show() ``` ### Does this PR introduce _any_ user-facing change? It changes the behaviour of the following calls to allow returning empty `pd.DataFrame` without defining columns. The PySpark DataFrame returned by `applyInPandas` is unchanged: - `df.groupby(…).applyInPandas(…)` - `df.cogroup(…).applyInPandas(…)` ### How was this patch tested? Tests are added that test `applyInPandas` and `mapInPandas` when returning - empty DataFrame with no columns - empty DataFrame with the wrong number of columns - non-empty DataFrame with wrong number of columns - something other than `pd.DataFrame` NOTE: It is not an error for `mapInPandas` to return DataFrames with more columns than specified in the `mapInPandas` schema. Closes #36120 from EnricoMi/branch-empty-pd-dataframes. Authored-by: Enrico Minack <git...@enrico.minack.dev> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 556c74578eb2379fc6e0ec8d147674d0b10e5a2c) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../pyspark/sql/tests/test_pandas_cogrouped_map.py | 97 ++++++++++++++++++++++ .../pyspark/sql/tests/test_pandas_grouped_map.py | 76 +++++++++++++++++ python/pyspark/sql/tests/test_pandas_map.py | 71 ++++++++++++++-- python/pyspark/worker.py | 12 ++- 4 files changed, 246 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py index 58022fa6e83..3f403d9c9d6 100644 --- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py @@ -20,6 +20,7 @@ from typing import cast from pyspark.sql.functions import array, explode, col, lit, udf, pandas_udf from pyspark.sql.types import DoubleType, StructType, StructField, Row +from pyspark.sql.utils import PythonException from pyspark.testing.sqlutils import ( ReusedSQLTestCase, have_pandas, @@ -124,6 +125,102 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase): assert_frame_equal(expected, result) + def test_apply_in_pandas_not_returning_pandas_dataframe(self): + left = self.data1 + right = self.data2 + + def merge_pandas(lft, rgt): + return lft.size + rgt.size + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "Return type of the user-defined function should be pandas.DataFrame, " + "but is <class 'numpy.int64'>", + ): + ( + left.groupby("id") + .cogroup(right.groupby("id")) + .applyInPandas(merge_pandas, "id long, k int, v int, v2 int") + .collect() + ) + + def test_apply_in_pandas_returning_wrong_number_of_columns(self): + left = self.data1 + right = self.data2 + + def merge_pandas(lft, rgt): + if 0 in lft["id"] and lft["id"][0] % 2 == 0: + lft["add"] = 0 + if 0 in rgt["id"] and rgt["id"][0] % 3 == 0: + rgt["more"] = 1 + return pd.merge(lft, rgt, on=["id", "k"]) + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "Number of columns of the returned pandas.DataFrame " + "doesn't match specified schema. Expected: 4 Actual: 6", + ): + ( + # merge_pandas returns two columns for even keys while we set schema to four + left.groupby("id") + .cogroup(right.groupby("id")) + .applyInPandas(merge_pandas, "id long, k int, v int, v2 int") + .collect() + ) + + def test_apply_in_pandas_returning_empty_dataframe(self): + left = self.data1 + right = self.data2 + + def merge_pandas(lft, rgt): + if 0 in lft["id"] and lft["id"][0] % 2 == 0: + return pd.DataFrame([]) + if 0 in rgt["id"] and rgt["id"][0] % 3 == 0: + return pd.DataFrame([]) + return pd.merge(lft, rgt, on=["id", "k"]) + + result = ( + left.groupby("id") + .cogroup(right.groupby("id")) + .applyInPandas(merge_pandas, "id long, k int, v int, v2 int") + .sort(["id", "k"]) + .toPandas() + ) + + left = left.toPandas() + right = right.toPandas() + + expected = pd.merge( + left[left["id"] % 2 != 0], right[right["id"] % 3 != 0], on=["id", "k"] + ).sort_values(by=["id", "k"]) + + assert_frame_equal(expected, result) + + def test_apply_in_pandas_returning_empty_dataframe_and_wrong_number_of_columns(self): + left = self.data1 + right = self.data2 + + def merge_pandas(lft, rgt): + if 0 in lft["id"] and lft["id"][0] % 2 == 0: + return pd.DataFrame([], columns=["id", "k"]) + return pd.merge(lft, rgt, on=["id", "k"]) + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "Number of columns of the returned pandas.DataFrame doesn't " + "match specified schema. Expected: 4 Actual: 2", + ): + ( + # merge_pandas returns two columns for even keys while we set schema to four + left.groupby("id") + .cogroup(right.groupby("id")) + .applyInPandas(merge_pandas, "id long, k int, v int, v2 int") + .collect() + ) + def test_mixed_scalar_udfs_followed_by_cogrouby_apply(self): df = self.spark.range(0, 10).toDF("v1") df = df.withColumn("v2", udf(lambda x: x + 1, "int")(df["v1"])).withColumn( diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py b/python/pyspark/sql/tests/test_pandas_grouped_map.py index bc1593069ed..4fd5207f73a 100644 --- a/python/pyspark/sql/tests/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py @@ -51,6 +51,7 @@ from pyspark.sql.types import ( NullType, TimestampType, ) +from pyspark.sql.utils import PythonException from pyspark.testing.sqlutils import ( ReusedSQLTestCase, have_pandas, @@ -268,6 +269,81 @@ class GroupedMapInPandasTests(ReusedSQLTestCase): expected = expected.assign(norm=expected.norm.astype("float64")) assert_frame_equal(expected, result) + def test_apply_in_pandas_not_returning_pandas_dataframe(self): + df = self.data + + def stats(key, _): + return key + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "Return type of the user-defined function should be pandas.DataFrame, " + "but is <class 'tuple'>", + ): + df.groupby("id").applyInPandas(stats, schema="id integer, m double").collect() + + def test_apply_in_pandas_returning_wrong_number_of_columns(self): + df = self.data + + def stats(key, pdf): + v = pdf.v + # returning three columns + res = pd.DataFrame([key + (v.mean(), v.std())]) + return res + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "Number of columns of the returned pandas.DataFrame doesn't match " + "specified schema. Expected: 2 Actual: 3", + ): + # stats returns three columns while here we set schema with two columns + df.groupby("id").applyInPandas(stats, schema="id integer, m double").collect() + + def test_apply_in_pandas_returning_empty_dataframe(self): + df = self.data + + def odd_means(key, pdf): + if key[0] % 2 == 0: + return pd.DataFrame([]) + else: + return pd.DataFrame([key + (pdf.v.mean(),)]) + + expected_ids = {row[0] for row in self.data.collect() if row[0] % 2 != 0} + + result = ( + df.groupby("id") + .applyInPandas(odd_means, schema="id integer, m double") + .sort("id", "m") + .collect() + ) + + actual_ids = {row[0] for row in result} + self.assertSetEqual(expected_ids, actual_ids) + + self.assertEqual(len(expected_ids), len(result)) + for row in result: + self.assertEqual(24.5, row[1]) + + def test_apply_in_pandas_returning_empty_dataframe_and_wrong_number_of_columns(self): + df = self.data + + def odd_means(key, pdf): + if key[0] % 2 == 0: + return pd.DataFrame([], columns=["id"]) + else: + return pd.DataFrame([key + (pdf.v.mean(),)]) + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "Number of columns of the returned pandas.DataFrame doesn't match " + "specified schema. Expected: 2 Actual: 1", + ): + # stats returns one column for even keys while here we set schema with two columns + df.groupby("id").applyInPandas(odd_means, schema="id integer, m double").collect() + def test_datatype_string(self): df = self.data diff --git a/python/pyspark/sql/tests/test_pandas_map.py b/python/pyspark/sql/tests/test_pandas_map.py index 360d20050a3..11da879da38 100644 --- a/python/pyspark/sql/tests/test_pandas_map.py +++ b/python/pyspark/sql/tests/test_pandas_map.py @@ -22,6 +22,8 @@ import unittest from typing import cast from pyspark.sql import Row +from pyspark.sql.functions import lit +from pyspark.sql.utils import PythonException from pyspark.testing.sqlutils import ( ReusedSQLTestCase, have_pandas, @@ -29,6 +31,7 @@ from pyspark.testing.sqlutils import ( pandas_requirement_message, pyarrow_requirement_message, ) +from pyspark.testing.utils import QuietTest if have_pandas: import pandas as pd @@ -60,14 +63,14 @@ class MapInPandasTests(ReusedSQLTestCase): time.tzset() ReusedSQLTestCase.tearDownClass() - def test_map_partitions_in_pandas(self): + def test_map_in_pandas(self): def func(iterator): for pdf in iterator: assert isinstance(pdf, pd.DataFrame) assert pdf.columns == ["id"] yield pdf - df = self.spark.range(10) + df = self.spark.range(10, numPartitions=3) actual = df.mapInPandas(func, "id long").collect() expected = df.collect() self.assertEqual(actual, expected) @@ -95,17 +98,69 @@ class MapInPandasTests(ReusedSQLTestCase): actual = df.repartition(1).mapInPandas(func, "a long").collect() self.assertEqual(set((r.a for r in actual)), set(range(100))) + def test_other_than_dataframe(self): + def bad_iter(_): + return iter([1]) + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "Return type of the user-defined function should be Pandas.DataFrame, " + "but is <class 'int'>", + ): + ( + self.spark.range(10, numPartitions=3) + .mapInPandas(bad_iter, "a int, b string") + .count() + ) + def test_empty_iterator(self): def empty_iter(_): return iter([]) - self.assertEqual(self.spark.range(10).mapInPandas(empty_iter, "a int, b string").count(), 0) + mapped = self.spark.range(10, numPartitions=3).mapInPandas(empty_iter, "a int, b string") + self.assertEqual(mapped.count(), 0) - def test_empty_rows(self): - def empty_rows(_): + def test_empty_dataframes(self): + def empty_dataframes(_): return iter([pd.DataFrame({"a": []})]) - self.assertEqual(self.spark.range(10).mapInPandas(empty_rows, "a int").count(), 0) + mapped = self.spark.range(10, numPartitions=3).mapInPandas(empty_dataframes, "a int") + self.assertEqual(mapped.count(), 0) + + def test_empty_dataframes_without_columns(self): + def empty_dataframes_wo_columns(iterator): + for pdf in iterator: + yield pdf + # after yielding all elements of the iterator, also yield one dataframe without columns + yield pd.DataFrame([]) + + mapped = ( + self.spark.range(10, numPartitions=3) + .toDF("id") + .mapInPandas(empty_dataframes_wo_columns, "id int") + ) + self.assertEqual(mapped.count(), 10) + + def test_empty_dataframes_with_less_columns(self): + def empty_dataframes_with_less_columns(iterator): + for pdf in iterator: + yield pdf + # after yielding all elements of the iterator, also yield a dataframe with less columns + yield pd.DataFrame([(1,)], columns=["id"]) + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "KeyError: 'value'", + ): + ( + self.spark.range(10, numPartitions=3) + .withColumn("value", lit(0)) + .toDF("id", "value") + .mapInPandas(empty_dataframes_with_less_columns, "id int, value int") + .collect() + ) def test_chain_map_partitions_in_pandas(self): def func(iterator): @@ -114,14 +169,14 @@ class MapInPandasTests(ReusedSQLTestCase): assert pdf.columns == ["id"] yield pdf - df = self.spark.range(10) + df = self.spark.range(10, numPartitions=3) actual = df.mapInPandas(func, "id long").mapInPandas(func, "id long").collect() expected = df.collect() self.assertEqual(actual, expected) def test_self_join(self): # SPARK-34319: self-join with MapInPandas - df1 = self.spark.range(10) + df1 = self.spark.range(10, numPartitions=3) df2 = df1.mapInPandas(lambda iter: iter, "id long") actual = df2.join(df2).collect() expected = df1.join(df1).collect() diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8784abfb333..c486b7bed1d 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -162,7 +162,11 @@ def wrap_cogrouped_map_pandas_udf(f, return_type, argspec): "Return type of the user-defined function should be " "pandas.DataFrame, but is {}".format(type(result)) ) - if not len(result.columns) == len(return_type): + # the number of columns of result have to match the return type + # but it is fine for result to have no columns at all if it is empty + if not ( + len(result.columns) == len(return_type) or len(result.columns) == 0 and result.empty + ): raise RuntimeError( "Number of columns of the returned pandas.DataFrame " "doesn't match specified schema. " @@ -188,7 +192,11 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec): "Return type of the user-defined function should be " "pandas.DataFrame, but is {}".format(type(result)) ) - if not len(result.columns) == len(return_type): + # the number of columns of result have to match the return type + # but it is fine for result to have no columns at all if it is empty + if not ( + len(result.columns) == len(return_type) or len(result.columns) == 0 and result.empty + ): raise RuntimeError( "Number of columns of the returned pandas.DataFrame " "doesn't match specified schema. " --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org