EnricoMi commented on code in PR #38223: URL: https://github.com/apache/spark/pull/38223#discussion_r1015075103
########## python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py: ########## @@ -165,100 +148,191 @@ def merge_pandas(lft, _): ) def test_apply_in_pandas_not_returning_pandas_dataframe(self): - left = self.data1 - right = self.data2 + self._test_merge_error( + fn=lambda lft, rgt: lft.size + rgt.size, + error_class=PythonException, + error_message_regex="Return type of the user-defined function " + "should be pandas.DataFrame, but is <class 'numpy.int64'>", + ) + + def test_apply_in_pandas_returning_column_names(self): + self._test_merge(fn=lambda lft, rgt: pd.merge(lft, rgt, on=["id", "k"])) + def test_apply_in_pandas_returning_no_column_names(self): def merge_pandas(lft, rgt): - return lft.size + rgt.size + res = pd.merge(lft, rgt, on=["id", "k"]) + res.columns = range(res.columns.size) + return res - with QuietTest(self.sc): - with self.assertRaisesRegex( - PythonException, - "Return type of the user-defined function should be pandas.DataFrame, " - "but is <class 'numpy.int64'>", - ): - ( - left.groupby("id") - .cogroup(right.groupby("id")) - .applyInPandas(merge_pandas, "id long, k int, v int, v2 int") - .collect() - ) + self._test_merge(fn=merge_pandas) - def test_apply_in_pandas_returning_wrong_number_of_columns(self): - left = self.data1 - right = self.data2 + def test_apply_in_pandas_returning_column_names_sometimes(self): + def merge_pandas(lft, rgt): + res = pd.merge(lft, rgt, on=["id", "k"]) + if 0 in lft["id"] and lft["id"][0] % 2 == 0: + return res + res.columns = range(res.columns.size) + return res + + self._test_merge(fn=merge_pandas) + def test_apply_in_pandas_returning_wrong_column_names(self): def merge_pandas(lft, rgt): if 0 in lft["id"] and lft["id"][0] % 2 == 0: lft["add"] = 0 if 0 in rgt["id"] and rgt["id"][0] % 3 == 0: rgt["more"] = 1 return pd.merge(lft, rgt, on=["id", "k"]) - with QuietTest(self.sc): - with self.assertRaisesRegex( - PythonException, - "Number of columns of the returned pandas.DataFrame " - "doesn't match specified schema. Expected: 4 Actual: 6", - ): - ( - # merge_pandas returns two columns for even keys while we set schema to four - left.groupby("id") - .cogroup(right.groupby("id")) - .applyInPandas(merge_pandas, "id long, k int, v int, v2 int") - .collect() - ) + self._test_merge_error( + fn=merge_pandas, + error_class=PythonException, + error_message_regex="Column names of the returned pandas.DataFrame " + "do not match specified schema. Unexpected: add, more Schema: id, k, v, v2\n", + ) - def test_apply_in_pandas_returning_empty_dataframe(self): - left = self.data1 - right = self.data2 + # with very large schema, missing and unexpected is limited to 5 + # and the schema is abbreviated in the error message + schema = "id long, k long, mean double, " + ", ".join( + f"column_with_long_column_name_{no} integer" for no in range(35) + ) + self._test_merge_error( + fn=lambda lft, rgt: pd.DataFrame( + [ + ( + lft.id, + lft.k, + lft.v.mean(), + ) + + tuple(lft.v.mean() for _ in range(7)) + ], + columns=["id", "k", "mean"] + [f"extra_column_{no} integer" for no in range(7)], + ), + output_schema=schema, + error_class=PythonException, + error_message_regex="Column names of the returned pandas\\.DataFrame " + "do not match specified schema\\. " + "Missing \\(first 5 of 35\\): column_with_long_column_name_0," + " column_with_long_column_name_1, column_with_long_column_name_10," + " column_with_long_column_name_11, column_with_long_column_name_12 " + "Unexpected \\(first 5 of 7\\): extra_column_0 integer, extra_column_1 integer," + " extra_column_2 integer, extra_column_3 integer, extra_column_4 integer " + "Schema: id, k, mean, column_with_long_column_name_0, column_with_long_column_name_1," + " column_with_long_column_name_2, column_with_long_column_name_3," + " column_with_long_column_name_4, column_with_long_column_name_5," + " column_with_long_column_name_6, column_with_long_column_name_7," + " column_with_long_column_name_8, column_with_long_column_name_9," + " column_with_long_column_name_10, column_with_long_column_name_11," + " column_with_long_column_name_12, column_with_long_column_name_13," + " column_with_long_column_name_14, column_with_\\.\\.\\.g_column_name_19," + " column_with_long_column_name_20, column_with_long_column_name_21," + " column_with_long_column_name_22, column_with_long_column_name_23," + " column_with_long_column_name_24, column_with_long_column_name_25," + " column_with_long_column_name_26, column_with_long_column_name_27," + " column_with_long_column_name_28, column_with_long_column_name_29," + " column_with_long_column_name_30, column_with_long_column_name_31," + " column_with_long_column_name_32, column_with_long_column_name_33," + " column_with_long_column_name_34\n", + ) + def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self): def merge_pandas(lft, rgt): if 0 in lft["id"] and lft["id"][0] % 2 == 0: - return pd.DataFrame([]) + lft[3] = 0 if 0 in rgt["id"] and rgt["id"][0] % 3 == 0: - return pd.DataFrame([]) - return pd.merge(lft, rgt, on=["id", "k"]) - - result = ( - left.groupby("id") - .cogroup(right.groupby("id")) - .applyInPandas(merge_pandas, "id long, k int, v int, v2 int") - .sort(["id", "k"]) - .toPandas() + rgt[3] = 1 + res = pd.merge(lft, rgt, on=["id", "k"]) + res.columns = range(res.columns.size) + return res + + self._test_merge_error( + fn=merge_pandas, + error_class=PythonException, + error_message_regex="Number of columns of the returned pandas.DataFrame " + "doesn't match specified schema. Expected: 4 Actual: 6 Schema: id, k, v, v2\n", ) - left = left.toPandas() - right = right.toPandas() - - expected = pd.merge( - left[left["id"] % 2 != 0], right[right["id"] % 3 != 0], on=["id", "k"] - ).sort_values(by=["id", "k"]) + # with very large schema the schema is abbreviated in the error message + schema = "id long, k long, mean double, " + ", ".join( + f"column_with_long_column_name_{no} integer" for no in range(35) + ) - assert_frame_equal(expected, result) + def fn(lft, _): + # remove column names from lft DataFrame + lft.columns = range(lft.columns.size) + return lft - def test_apply_in_pandas_returning_empty_dataframe_and_wrong_number_of_columns(self): - left = self.data1 - right = self.data2 + self._test_merge_error( + fn=fn, + output_schema=schema, + error_class=PythonException, + error_message_regex="Number of columns of the returned pandas\\.DataFrame " + "doesn't match specified schema\\. Expected: 38 Actual: 3 " + "Schema: id, k, mean, column_with_long_column_name_0, column_with_long_column_name_1," + " column_with_long_column_name_2, column_with_long_column_name_3," + " column_with_long_column_name_4, column_with_long_column_name_5," + " column_with_long_column_name_6, column_with_long_column_name_7," + " column_with_long_column_name_8, column_with_long_column_name_9," + " column_with_long_column_name_10, column_with_long_column_name_11," + " column_with_long_column_name_12, column_with_long_column_name_13," + " column_with_long_column_name_14, column_with_\\.\\.\\.g_column_name_19," + " column_with_long_column_name_20, column_with_long_column_name_21," + " column_with_long_column_name_22, column_with_long_column_name_23," + " column_with_long_column_name_24, column_with_long_column_name_25," + " column_with_long_column_name_26, column_with_long_column_name_27," + " column_with_long_column_name_28, column_with_long_column_name_29," + " column_with_long_column_name_30, column_with_long_column_name_31," + " column_with_long_column_name_32, column_with_long_column_name_33," + " column_with_long_column_name_34\n", + ) + def test_apply_in_pandas_returning_empty_dataframe(self): def merge_pandas(lft, rgt): if 0 in lft["id"] and lft["id"][0] % 2 == 0: - return pd.DataFrame([], columns=["id", "k"]) + return pd.DataFrame() + if 0 in rgt["id"] and rgt["id"][0] % 3 == 0: + return pd.DataFrame() return pd.merge(lft, rgt, on=["id", "k"]) - with QuietTest(self.sc): - with self.assertRaisesRegex( - PythonException, - "Number of columns of the returned pandas.DataFrame doesn't " - "match specified schema. Expected: 4 Actual: 2", - ): - ( - # merge_pandas returns two columns for even keys while we set schema to four - left.groupby("id") - .cogroup(right.groupby("id")) - .applyInPandas(merge_pandas, "id long, k int, v int, v2 int") - .collect() - ) + self._test_merge_empty(fn=merge_pandas) + + def test_apply_in_pandas_returning_incompatible_type(self): + for safely in [True, False]: + with self.subTest(convertToArrowArraySafely=safely), self.sql_conf( + {"spark.sql.execution.pandas.convertToArrowArraySafely": safely} + ), QuietTest(self.sc): + # sometimes we see ValueErrors + with self.subTest(convert="string to double"): + expected = ( + "ValueError: Exception thrown when converting pandas\\.Series \\(object\\) " + "with name 'k' to Arrow Array \\(double\\)\\." Review Comment: done ########## python/pyspark/worker.py: ########## @@ -159,27 +226,13 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se key_series = left_key_series if not left_df.empty else right_key_series key = tuple(s[0] for s in key_series) result = f(key, left_df, right_df) - if not isinstance(result, pd.DataFrame): - raise TypeError( - "Return type of the user-defined function should be " - "pandas.DataFrame, but is {}".format(type(result)) - ) - # the number of columns of result have to match the return type - # but it is fine for result to have no columns at all if it is empty - if not ( - len(result.columns) == len(return_type) or len(result.columns) == 0 and result.empty - ): - raise RuntimeError( - "Number of columns of the returned pandas.DataFrame " - "doesn't match specified schema. " - "Expected: {} Actual: {}".format(len(return_type), len(result.columns)) - ) + verify_pandas_result(result, return_type, assign_cols_by_name(runner_conf)) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org