This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new c44020b961f [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return 
empty DataFrame without columns
c44020b961f is described below

commit c44020b961ffe44e30ee617af6ffb84effbd28fe
Author: Enrico Minack <git...@enrico.minack.dev>
AuthorDate: Wed Apr 13 17:07:27 2022 +0900

    [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return empty DataFrame 
without columns
    
    ### What changes were proposed in this pull request?
    Methods `wrap_cogrouped_map_pandas_udf` and `wrap_grouped_map_pandas_udf` 
in `python/pyspark/worker.py` do not need to reject `pd.DataFrame`s with no 
columns return by udf when that DataFrame is empty (zero rows). This allows to 
return empty DataFrames without the need to define columns. The DataFrame is 
empty after all!
    
    **The proposed behaviour is consistent with the current behaviour of 
`DataFrame.mapInPandas`.**
    
    ### Why are the changes needed?
    Returning an empty DataFrame from the lambda given to `applyInPandas` 
should be as easy as this:
    
    ```python
    return pd.DataFrame([])
    ```
    
    However, PySpark requires that empty DataFrame to have the right _number_ 
of columns. This seems redundant as the schema is already defined in the 
`applyInPandas` call. Returning a non-empty DataFrame does not require defining 
columns.
    
    Behaviour of `applyInPandas` should be consistent with `mapInPandas`.
    
    Here is an example to reproduce:
    ```python
    import pandas as pd
    
    from pyspark.sql.functions import pandas_udf, ceil
    
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    
    def mean_func(key, pdf):
        if key == (1,):
            return pd.DataFrame([])
        else:
            return pd.DataFrame([key + (pdf.v.mean(),)])
    
    df.groupby("id").applyInPandas(mean_func, schema="id long, v double").show()
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    It changes the behaviour of the following calls to allow returning empty 
`pd.DataFrame` without defining columns. The PySpark DataFrame returned by 
`applyInPandas` is unchanged:
    
    - `df.groupby(…).applyInPandas(…)`
    - `df.cogroup(…).applyInPandas(…)`
    
    ### How was this patch tested?
    Tests are added that test `applyInPandas` and `mapInPandas` when returning
    
    - empty DataFrame with no columns
    - empty DataFrame with the wrong number of columns
    - non-empty DataFrame with wrong number of columns
    - something other than `pd.DataFrame`
    
    NOTE: It is not an error for `mapInPandas` to return DataFrames with more 
columns than specified in the `mapInPandas` schema.
    
    Closes #36120 from EnricoMi/branch-empty-pd-dataframes.
    
    Authored-by: Enrico Minack <git...@enrico.minack.dev>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 556c74578eb2379fc6e0ec8d147674d0b10e5a2c)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../pyspark/sql/tests/test_pandas_cogrouped_map.py | 97 ++++++++++++++++++++++
 .../pyspark/sql/tests/test_pandas_grouped_map.py   | 76 +++++++++++++++++
 python/pyspark/sql/tests/test_pandas_map.py        | 71 ++++++++++++++--
 python/pyspark/worker.py                           | 12 ++-
 4 files changed, 246 insertions(+), 10 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py 
b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
index 58022fa6e83..3f403d9c9d6 100644
--- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
@@ -20,6 +20,7 @@ from typing import cast
 
 from pyspark.sql.functions import array, explode, col, lit, udf, pandas_udf
 from pyspark.sql.types import DoubleType, StructType, StructField, Row
+from pyspark.sql.utils import PythonException
 from pyspark.testing.sqlutils import (
     ReusedSQLTestCase,
     have_pandas,
@@ -124,6 +125,102 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
 
         assert_frame_equal(expected, result)
 
+    def test_apply_in_pandas_not_returning_pandas_dataframe(self):
+        left = self.data1
+        right = self.data2
+
+        def merge_pandas(lft, rgt):
+            return lft.size + rgt.size
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegex(
+                PythonException,
+                "Return type of the user-defined function should be 
pandas.DataFrame, "
+                "but is <class 'numpy.int64'>",
+            ):
+                (
+                    left.groupby("id")
+                    .cogroup(right.groupby("id"))
+                    .applyInPandas(merge_pandas, "id long, k int, v int, v2 
int")
+                    .collect()
+                )
+
+    def test_apply_in_pandas_returning_wrong_number_of_columns(self):
+        left = self.data1
+        right = self.data2
+
+        def merge_pandas(lft, rgt):
+            if 0 in lft["id"] and lft["id"][0] % 2 == 0:
+                lft["add"] = 0
+            if 0 in rgt["id"] and rgt["id"][0] % 3 == 0:
+                rgt["more"] = 1
+            return pd.merge(lft, rgt, on=["id", "k"])
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegex(
+                PythonException,
+                "Number of columns of the returned pandas.DataFrame "
+                "doesn't match specified schema. Expected: 4 Actual: 6",
+            ):
+                (
+                    # merge_pandas returns two columns for even keys while we 
set schema to four
+                    left.groupby("id")
+                    .cogroup(right.groupby("id"))
+                    .applyInPandas(merge_pandas, "id long, k int, v int, v2 
int")
+                    .collect()
+                )
+
+    def test_apply_in_pandas_returning_empty_dataframe(self):
+        left = self.data1
+        right = self.data2
+
+        def merge_pandas(lft, rgt):
+            if 0 in lft["id"] and lft["id"][0] % 2 == 0:
+                return pd.DataFrame([])
+            if 0 in rgt["id"] and rgt["id"][0] % 3 == 0:
+                return pd.DataFrame([])
+            return pd.merge(lft, rgt, on=["id", "k"])
+
+        result = (
+            left.groupby("id")
+            .cogroup(right.groupby("id"))
+            .applyInPandas(merge_pandas, "id long, k int, v int, v2 int")
+            .sort(["id", "k"])
+            .toPandas()
+        )
+
+        left = left.toPandas()
+        right = right.toPandas()
+
+        expected = pd.merge(
+            left[left["id"] % 2 != 0], right[right["id"] % 3 != 0], on=["id", 
"k"]
+        ).sort_values(by=["id", "k"])
+
+        assert_frame_equal(expected, result)
+
+    def 
test_apply_in_pandas_returning_empty_dataframe_and_wrong_number_of_columns(self):
+        left = self.data1
+        right = self.data2
+
+        def merge_pandas(lft, rgt):
+            if 0 in lft["id"] and lft["id"][0] % 2 == 0:
+                return pd.DataFrame([], columns=["id", "k"])
+            return pd.merge(lft, rgt, on=["id", "k"])
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegex(
+                PythonException,
+                "Number of columns of the returned pandas.DataFrame doesn't "
+                "match specified schema. Expected: 4 Actual: 2",
+            ):
+                (
+                    # merge_pandas returns two columns for even keys while we 
set schema to four
+                    left.groupby("id")
+                    .cogroup(right.groupby("id"))
+                    .applyInPandas(merge_pandas, "id long, k int, v int, v2 
int")
+                    .collect()
+                )
+
     def test_mixed_scalar_udfs_followed_by_cogrouby_apply(self):
         df = self.spark.range(0, 10).toDF("v1")
         df = df.withColumn("v2", udf(lambda x: x + 1, 
"int")(df["v1"])).withColumn(
diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py 
b/python/pyspark/sql/tests/test_pandas_grouped_map.py
index bc1593069ed..4fd5207f73a 100644
--- a/python/pyspark/sql/tests/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py
@@ -51,6 +51,7 @@ from pyspark.sql.types import (
     NullType,
     TimestampType,
 )
+from pyspark.sql.utils import PythonException
 from pyspark.testing.sqlutils import (
     ReusedSQLTestCase,
     have_pandas,
@@ -268,6 +269,81 @@ class GroupedMapInPandasTests(ReusedSQLTestCase):
         expected = expected.assign(norm=expected.norm.astype("float64"))
         assert_frame_equal(expected, result)
 
+    def test_apply_in_pandas_not_returning_pandas_dataframe(self):
+        df = self.data
+
+        def stats(key, _):
+            return key
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegex(
+                PythonException,
+                "Return type of the user-defined function should be 
pandas.DataFrame, "
+                "but is <class 'tuple'>",
+            ):
+                df.groupby("id").applyInPandas(stats, schema="id integer, m 
double").collect()
+
+    def test_apply_in_pandas_returning_wrong_number_of_columns(self):
+        df = self.data
+
+        def stats(key, pdf):
+            v = pdf.v
+            # returning three columns
+            res = pd.DataFrame([key + (v.mean(), v.std())])
+            return res
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegex(
+                PythonException,
+                "Number of columns of the returned pandas.DataFrame doesn't 
match "
+                "specified schema. Expected: 2 Actual: 3",
+            ):
+                # stats returns three columns while here we set schema with 
two columns
+                df.groupby("id").applyInPandas(stats, schema="id integer, m 
double").collect()
+
+    def test_apply_in_pandas_returning_empty_dataframe(self):
+        df = self.data
+
+        def odd_means(key, pdf):
+            if key[0] % 2 == 0:
+                return pd.DataFrame([])
+            else:
+                return pd.DataFrame([key + (pdf.v.mean(),)])
+
+        expected_ids = {row[0] for row in self.data.collect() if row[0] % 2 != 
0}
+
+        result = (
+            df.groupby("id")
+            .applyInPandas(odd_means, schema="id integer, m double")
+            .sort("id", "m")
+            .collect()
+        )
+
+        actual_ids = {row[0] for row in result}
+        self.assertSetEqual(expected_ids, actual_ids)
+
+        self.assertEqual(len(expected_ids), len(result))
+        for row in result:
+            self.assertEqual(24.5, row[1])
+
+    def 
test_apply_in_pandas_returning_empty_dataframe_and_wrong_number_of_columns(self):
+        df = self.data
+
+        def odd_means(key, pdf):
+            if key[0] % 2 == 0:
+                return pd.DataFrame([], columns=["id"])
+            else:
+                return pd.DataFrame([key + (pdf.v.mean(),)])
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegex(
+                PythonException,
+                "Number of columns of the returned pandas.DataFrame doesn't 
match "
+                "specified schema. Expected: 2 Actual: 1",
+            ):
+                # stats returns one column for even keys while here we set 
schema with two columns
+                df.groupby("id").applyInPandas(odd_means, schema="id integer, 
m double").collect()
+
     def test_datatype_string(self):
         df = self.data
 
diff --git a/python/pyspark/sql/tests/test_pandas_map.py 
b/python/pyspark/sql/tests/test_pandas_map.py
index 360d20050a3..11da879da38 100644
--- a/python/pyspark/sql/tests/test_pandas_map.py
+++ b/python/pyspark/sql/tests/test_pandas_map.py
@@ -22,6 +22,8 @@ import unittest
 from typing import cast
 
 from pyspark.sql import Row
+from pyspark.sql.functions import lit
+from pyspark.sql.utils import PythonException
 from pyspark.testing.sqlutils import (
     ReusedSQLTestCase,
     have_pandas,
@@ -29,6 +31,7 @@ from pyspark.testing.sqlutils import (
     pandas_requirement_message,
     pyarrow_requirement_message,
 )
+from pyspark.testing.utils import QuietTest
 
 if have_pandas:
     import pandas as pd
@@ -60,14 +63,14 @@ class MapInPandasTests(ReusedSQLTestCase):
         time.tzset()
         ReusedSQLTestCase.tearDownClass()
 
-    def test_map_partitions_in_pandas(self):
+    def test_map_in_pandas(self):
         def func(iterator):
             for pdf in iterator:
                 assert isinstance(pdf, pd.DataFrame)
                 assert pdf.columns == ["id"]
                 yield pdf
 
-        df = self.spark.range(10)
+        df = self.spark.range(10, numPartitions=3)
         actual = df.mapInPandas(func, "id long").collect()
         expected = df.collect()
         self.assertEqual(actual, expected)
@@ -95,17 +98,69 @@ class MapInPandasTests(ReusedSQLTestCase):
         actual = df.repartition(1).mapInPandas(func, "a long").collect()
         self.assertEqual(set((r.a for r in actual)), set(range(100)))
 
+    def test_other_than_dataframe(self):
+        def bad_iter(_):
+            return iter([1])
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegex(
+                PythonException,
+                "Return type of the user-defined function should be 
Pandas.DataFrame, "
+                "but is <class 'int'>",
+            ):
+                (
+                    self.spark.range(10, numPartitions=3)
+                    .mapInPandas(bad_iter, "a int, b string")
+                    .count()
+                )
+
     def test_empty_iterator(self):
         def empty_iter(_):
             return iter([])
 
-        self.assertEqual(self.spark.range(10).mapInPandas(empty_iter, "a int, 
b string").count(), 0)
+        mapped = self.spark.range(10, numPartitions=3).mapInPandas(empty_iter, 
"a int, b string")
+        self.assertEqual(mapped.count(), 0)
 
-    def test_empty_rows(self):
-        def empty_rows(_):
+    def test_empty_dataframes(self):
+        def empty_dataframes(_):
             return iter([pd.DataFrame({"a": []})])
 
-        self.assertEqual(self.spark.range(10).mapInPandas(empty_rows, "a 
int").count(), 0)
+        mapped = self.spark.range(10, 
numPartitions=3).mapInPandas(empty_dataframes, "a int")
+        self.assertEqual(mapped.count(), 0)
+
+    def test_empty_dataframes_without_columns(self):
+        def empty_dataframes_wo_columns(iterator):
+            for pdf in iterator:
+                yield pdf
+            # after yielding all elements of the iterator, also yield one 
dataframe without columns
+            yield pd.DataFrame([])
+
+        mapped = (
+            self.spark.range(10, numPartitions=3)
+            .toDF("id")
+            .mapInPandas(empty_dataframes_wo_columns, "id int")
+        )
+        self.assertEqual(mapped.count(), 10)
+
+    def test_empty_dataframes_with_less_columns(self):
+        def empty_dataframes_with_less_columns(iterator):
+            for pdf in iterator:
+                yield pdf
+            # after yielding all elements of the iterator, also yield a 
dataframe with less columns
+            yield pd.DataFrame([(1,)], columns=["id"])
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegex(
+                PythonException,
+                "KeyError: 'value'",
+            ):
+                (
+                    self.spark.range(10, numPartitions=3)
+                    .withColumn("value", lit(0))
+                    .toDF("id", "value")
+                    .mapInPandas(empty_dataframes_with_less_columns, "id int, 
value int")
+                    .collect()
+                )
 
     def test_chain_map_partitions_in_pandas(self):
         def func(iterator):
@@ -114,14 +169,14 @@ class MapInPandasTests(ReusedSQLTestCase):
                 assert pdf.columns == ["id"]
                 yield pdf
 
-        df = self.spark.range(10)
+        df = self.spark.range(10, numPartitions=3)
         actual = df.mapInPandas(func, "id long").mapInPandas(func, "id 
long").collect()
         expected = df.collect()
         self.assertEqual(actual, expected)
 
     def test_self_join(self):
         # SPARK-34319: self-join with MapInPandas
-        df1 = self.spark.range(10)
+        df1 = self.spark.range(10, numPartitions=3)
         df2 = df1.mapInPandas(lambda iter: iter, "id long")
         actual = df2.join(df2).collect()
         expected = df1.join(df1).collect()
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 8784abfb333..c486b7bed1d 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -162,7 +162,11 @@ def wrap_cogrouped_map_pandas_udf(f, return_type, argspec):
                 "Return type of the user-defined function should be "
                 "pandas.DataFrame, but is {}".format(type(result))
             )
-        if not len(result.columns) == len(return_type):
+        # the number of columns of result have to match the return type
+        # but it is fine for result to have no columns at all if it is empty
+        if not (
+            len(result.columns) == len(return_type) or len(result.columns) == 
0 and result.empty
+        ):
             raise RuntimeError(
                 "Number of columns of the returned pandas.DataFrame "
                 "doesn't match specified schema. "
@@ -188,7 +192,11 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec):
                 "Return type of the user-defined function should be "
                 "pandas.DataFrame, but is {}".format(type(result))
             )
-        if not len(result.columns) == len(return_type):
+        # the number of columns of result have to match the return type
+        # but it is fine for result to have no columns at all if it is empty
+        if not (
+            len(result.columns) == len(return_type) or len(result.columns) == 
0 and result.empty
+        ):
             raise RuntimeError(
                 "Number of columns of the returned pandas.DataFrame "
                 "doesn't match specified schema. "


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to