EnricoMi commented on code in PR #38223:
URL: https://github.com/apache/spark/pull/38223#discussion_r1015075103


##########
python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py:
##########
@@ -165,100 +148,191 @@ def merge_pandas(lft, _):
                 )
 
     def test_apply_in_pandas_not_returning_pandas_dataframe(self):
-        left = self.data1
-        right = self.data2
+        self._test_merge_error(
+            fn=lambda lft, rgt: lft.size + rgt.size,
+            error_class=PythonException,
+            error_message_regex="Return type of the user-defined function "
+            "should be pandas.DataFrame, but is <class 'numpy.int64'>",
+        )
+
+    def test_apply_in_pandas_returning_column_names(self):
+        self._test_merge(fn=lambda lft, rgt: pd.merge(lft, rgt, on=["id", 
"k"]))
 
+    def test_apply_in_pandas_returning_no_column_names(self):
         def merge_pandas(lft, rgt):
-            return lft.size + rgt.size
+            res = pd.merge(lft, rgt, on=["id", "k"])
+            res.columns = range(res.columns.size)
+            return res
 
-        with QuietTest(self.sc):
-            with self.assertRaisesRegex(
-                PythonException,
-                "Return type of the user-defined function should be 
pandas.DataFrame, "
-                "but is <class 'numpy.int64'>",
-            ):
-                (
-                    left.groupby("id")
-                    .cogroup(right.groupby("id"))
-                    .applyInPandas(merge_pandas, "id long, k int, v int, v2 
int")
-                    .collect()
-                )
+        self._test_merge(fn=merge_pandas)
 
-    def test_apply_in_pandas_returning_wrong_number_of_columns(self):
-        left = self.data1
-        right = self.data2
+    def test_apply_in_pandas_returning_column_names_sometimes(self):
+        def merge_pandas(lft, rgt):
+            res = pd.merge(lft, rgt, on=["id", "k"])
+            if 0 in lft["id"] and lft["id"][0] % 2 == 0:
+                return res
+            res.columns = range(res.columns.size)
+            return res
+
+        self._test_merge(fn=merge_pandas)
 
+    def test_apply_in_pandas_returning_wrong_column_names(self):
         def merge_pandas(lft, rgt):
             if 0 in lft["id"] and lft["id"][0] % 2 == 0:
                 lft["add"] = 0
             if 0 in rgt["id"] and rgt["id"][0] % 3 == 0:
                 rgt["more"] = 1
             return pd.merge(lft, rgt, on=["id", "k"])
 
-        with QuietTest(self.sc):
-            with self.assertRaisesRegex(
-                PythonException,
-                "Number of columns of the returned pandas.DataFrame "
-                "doesn't match specified schema. Expected: 4 Actual: 6",
-            ):
-                (
-                    # merge_pandas returns two columns for even keys while we 
set schema to four
-                    left.groupby("id")
-                    .cogroup(right.groupby("id"))
-                    .applyInPandas(merge_pandas, "id long, k int, v int, v2 
int")
-                    .collect()
-                )
+        self._test_merge_error(
+            fn=merge_pandas,
+            error_class=PythonException,
+            error_message_regex="Column names of the returned pandas.DataFrame 
"
+            "do not match specified schema.  Unexpected: add, more  Schema: 
id, k, v, v2\n",
+        )
 
-    def test_apply_in_pandas_returning_empty_dataframe(self):
-        left = self.data1
-        right = self.data2
+        # with very large schema, missing and unexpected is limited to 5
+        # and the schema is abbreviated in the error message
+        schema = "id long, k long, mean double, " + ", ".join(
+            f"column_with_long_column_name_{no} integer" for no in range(35)
+        )
+        self._test_merge_error(
+            fn=lambda lft, rgt: pd.DataFrame(
+                [
+                    (
+                        lft.id,
+                        lft.k,
+                        lft.v.mean(),
+                    )
+                    + tuple(lft.v.mean() for _ in range(7))
+                ],
+                columns=["id", "k", "mean"] + [f"extra_column_{no} integer" 
for no in range(7)],
+            ),
+            output_schema=schema,
+            error_class=PythonException,
+            error_message_regex="Column names of the returned 
pandas\\.DataFrame "
+            "do not match specified schema\\.  "
+            "Missing \\(first 5 of 35\\): column_with_long_column_name_0,"
+            " column_with_long_column_name_1, column_with_long_column_name_10,"
+            " column_with_long_column_name_11, column_with_long_column_name_12 
 "
+            "Unexpected \\(first 5 of 7\\): extra_column_0 integer, 
extra_column_1 integer,"
+            " extra_column_2 integer, extra_column_3 integer, extra_column_4 
integer  "
+            "Schema: id, k, mean, column_with_long_column_name_0, 
column_with_long_column_name_1,"
+            " column_with_long_column_name_2, column_with_long_column_name_3,"
+            " column_with_long_column_name_4, column_with_long_column_name_5,"
+            " column_with_long_column_name_6, column_with_long_column_name_7,"
+            " column_with_long_column_name_8, column_with_long_column_name_9,"
+            " column_with_long_column_name_10, 
column_with_long_column_name_11,"
+            " column_with_long_column_name_12, 
column_with_long_column_name_13,"
+            " column_with_long_column_name_14, 
column_with_\\.\\.\\.g_column_name_19,"
+            " column_with_long_column_name_20, 
column_with_long_column_name_21,"
+            " column_with_long_column_name_22, 
column_with_long_column_name_23,"
+            " column_with_long_column_name_24, 
column_with_long_column_name_25,"
+            " column_with_long_column_name_26, 
column_with_long_column_name_27,"
+            " column_with_long_column_name_28, 
column_with_long_column_name_29,"
+            " column_with_long_column_name_30, 
column_with_long_column_name_31,"
+            " column_with_long_column_name_32, 
column_with_long_column_name_33,"
+            " column_with_long_column_name_34\n",
+        )
 
+    def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self):
         def merge_pandas(lft, rgt):
             if 0 in lft["id"] and lft["id"][0] % 2 == 0:
-                return pd.DataFrame([])
+                lft[3] = 0
             if 0 in rgt["id"] and rgt["id"][0] % 3 == 0:
-                return pd.DataFrame([])
-            return pd.merge(lft, rgt, on=["id", "k"])
-
-        result = (
-            left.groupby("id")
-            .cogroup(right.groupby("id"))
-            .applyInPandas(merge_pandas, "id long, k int, v int, v2 int")
-            .sort(["id", "k"])
-            .toPandas()
+                rgt[3] = 1
+            res = pd.merge(lft, rgt, on=["id", "k"])
+            res.columns = range(res.columns.size)
+            return res
+
+        self._test_merge_error(
+            fn=merge_pandas,
+            error_class=PythonException,
+            error_message_regex="Number of columns of the returned 
pandas.DataFrame "
+            "doesn't match specified schema.  Expected: 4  Actual: 6  Schema: 
id, k, v, v2\n",
         )
 
-        left = left.toPandas()
-        right = right.toPandas()
-
-        expected = pd.merge(
-            left[left["id"] % 2 != 0], right[right["id"] % 3 != 0], on=["id", 
"k"]
-        ).sort_values(by=["id", "k"])
+        # with very large schema the schema is abbreviated in the error message
+        schema = "id long, k long, mean double, " + ", ".join(
+            f"column_with_long_column_name_{no} integer" for no in range(35)
+        )
 
-        assert_frame_equal(expected, result)
+        def fn(lft, _):
+            # remove column names from lft DataFrame
+            lft.columns = range(lft.columns.size)
+            return lft
 
-    def 
test_apply_in_pandas_returning_empty_dataframe_and_wrong_number_of_columns(self):
-        left = self.data1
-        right = self.data2
+        self._test_merge_error(
+            fn=fn,
+            output_schema=schema,
+            error_class=PythonException,
+            error_message_regex="Number of columns of the returned 
pandas\\.DataFrame "
+            "doesn't match specified schema\\.  Expected: 38  Actual: 3  "
+            "Schema: id, k, mean, column_with_long_column_name_0, 
column_with_long_column_name_1,"
+            " column_with_long_column_name_2, column_with_long_column_name_3,"
+            " column_with_long_column_name_4, column_with_long_column_name_5,"
+            " column_with_long_column_name_6, column_with_long_column_name_7,"
+            " column_with_long_column_name_8, column_with_long_column_name_9,"
+            " column_with_long_column_name_10, 
column_with_long_column_name_11,"
+            " column_with_long_column_name_12, 
column_with_long_column_name_13,"
+            " column_with_long_column_name_14, 
column_with_\\.\\.\\.g_column_name_19,"
+            " column_with_long_column_name_20, 
column_with_long_column_name_21,"
+            " column_with_long_column_name_22, 
column_with_long_column_name_23,"
+            " column_with_long_column_name_24, 
column_with_long_column_name_25,"
+            " column_with_long_column_name_26, 
column_with_long_column_name_27,"
+            " column_with_long_column_name_28, 
column_with_long_column_name_29,"
+            " column_with_long_column_name_30, 
column_with_long_column_name_31,"
+            " column_with_long_column_name_32, 
column_with_long_column_name_33,"
+            " column_with_long_column_name_34\n",
+        )
 
+    def test_apply_in_pandas_returning_empty_dataframe(self):
         def merge_pandas(lft, rgt):
             if 0 in lft["id"] and lft["id"][0] % 2 == 0:
-                return pd.DataFrame([], columns=["id", "k"])
+                return pd.DataFrame()
+            if 0 in rgt["id"] and rgt["id"][0] % 3 == 0:
+                return pd.DataFrame()
             return pd.merge(lft, rgt, on=["id", "k"])
 
-        with QuietTest(self.sc):
-            with self.assertRaisesRegex(
-                PythonException,
-                "Number of columns of the returned pandas.DataFrame doesn't "
-                "match specified schema. Expected: 4 Actual: 2",
-            ):
-                (
-                    # merge_pandas returns two columns for even keys while we 
set schema to four
-                    left.groupby("id")
-                    .cogroup(right.groupby("id"))
-                    .applyInPandas(merge_pandas, "id long, k int, v int, v2 
int")
-                    .collect()
-                )
+        self._test_merge_empty(fn=merge_pandas)
+
+    def test_apply_in_pandas_returning_incompatible_type(self):
+        for safely in [True, False]:
+            with self.subTest(convertToArrowArraySafely=safely), self.sql_conf(
+                {"spark.sql.execution.pandas.convertToArrowArraySafely": 
safely}
+            ), QuietTest(self.sc):
+                # sometimes we see ValueErrors
+                with self.subTest(convert="string to double"):
+                    expected = (
+                        "ValueError: Exception thrown when converting 
pandas\\.Series \\(object\\) "
+                        "with name 'k' to Arrow Array \\(double\\)\\."

Review Comment:
   done



##########
python/pyspark/worker.py:
##########
@@ -159,27 +226,13 @@ def wrapped(left_key_series, left_value_series, 
right_key_series, right_value_se
             key_series = left_key_series if not left_df.empty else 
right_key_series
             key = tuple(s[0] for s in key_series)
             result = f(key, left_df, right_df)
-        if not isinstance(result, pd.DataFrame):
-            raise TypeError(
-                "Return type of the user-defined function should be "
-                "pandas.DataFrame, but is {}".format(type(result))
-            )
-        # the number of columns of result have to match the return type
-        # but it is fine for result to have no columns at all if it is empty
-        if not (
-            len(result.columns) == len(return_type) or len(result.columns) == 
0 and result.empty
-        ):
-            raise RuntimeError(
-                "Number of columns of the returned pandas.DataFrame "
-                "doesn't match specified schema. "
-                "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns))
-            )
+        verify_pandas_result(result, return_type, 
assign_cols_by_name(runner_conf))

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to