Yicong-Huang opened a new pull request, #55750: URL: https://github.com/apache/spark/pull/55750
### What changes were proposed in this pull request? Consolidate the `SQL_MAP_PANDAS_ITER_UDF` (`mapInPandas`) execution path so that all input transformation, UDF invocation, result verification, and output transformation live in a single block in `read_udfs()`, matching the patterns established by [SPARK-55389](https://issues.apache.org/jira/browse/SPARK-55389) (`SQL_MAP_ARROW_ITER_UDF`) and [SPARK-56691](https://issues.apache.org/jira/browse/SPARK-56691) (`SQL_GROUPED_MAP_PANDAS_ITER_UDF`). - Stop wrapping the UDF in `wrap_pandas_batch_iter_udf` for `MAP_PANDAS_ITER`. `read_single_udf` now returns `(func, return_type)` for this eval type. - Switch the serializer from `ArrowStreamPandasUDFSerializer` to `ArrowStreamSerializer(write_start_stream=True)` so the wrapper receives raw `Iterator[pa.RecordBatch]` and owns the Arrow<->pandas conversion. - Add a dedicated `SQL_MAP_PANDAS_ITER_UDF` branch in `read_udfs` that lazily converts each batch's struct column to `pd.DataFrame` (so peakmem stays bounded by a single batch), feeds the iterator to the UDF, verifies each yielded DataFrame against the return schema, and converts the result back to Arrow via `PandasToArrowConversion.convert`. - Drop `MAP_PANDAS_ITER` from the shared scalar-iter mapper (the `is_scalar_iter`/`is_map_pandas_iter` split collapses to a single `SCALAR_PANDAS_ITER` branch). - Make `verify_result` use the top-level package name only (e.g. `pandas.core` -> `pandas`) so the user-visible label remains `pandas.DataFrame` rather than `pandas.core.DataFrame`. The label for `pyarrow.RecordBatch` and `pyarrow.Array` is unchanged. ### Why are the changes needed? Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388). The existing logic for this eval type was scattered across a wrapper function, a shared mapper branch, and the serializer. After this change the full data flow for `mapInPandas` is visible in one place. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. No behavior change. - `pyspark/sql/tests/pandas/test_pandas_map.py` -- 28 passed - `pyspark/sql/tests/arrow/test_arrow_map.py` + `test_arrow_grouped_map.py` + `pyspark/sql/tests/pandas/test_pandas_udf_scalar.py` -- 176 passed, 2 skipped - `pyspark/sql/tests/pandas/test_pandas_udf.py` + `test_pandas_udf_typehints.py` -- 36 passed ASV `MapPandasIterUDFTimeBench` comparison (`-a repeat=3`, before = `upstream/master`, after = this branch): ```text scenario udf before (ms) after (ms) diff (%) sm_batch_few_col identity_udf 311 310 -0.3 sm_batch_few_col sort_udf 365 367 +0.5 sm_batch_few_col filter_udf 347 341 -1.7 sm_batch_many_col identity_udf 213 212 -0.5 sm_batch_many_col sort_udf 231 233 +0.9 sm_batch_many_col filter_udf 216 217 +0.5 lg_batch_few_col identity_udf 850 765 -10.0 lg_batch_few_col sort_udf 1030 965 -6.3 lg_batch_few_col filter_udf 815 804 -1.3 lg_batch_many_col identity_udf 791 787 -0.5 lg_batch_many_col sort_udf 1290 1280 -0.8 lg_batch_many_col filter_udf 889 811 -8.8 pure_ints identity_udf 152 140 -7.9 pure_ints sort_udf 168 166 -1.2 pure_ints filter_udf 164 151 -7.9 pure_floats identity_udf 158 139 -12.0 pure_floats sort_udf 183 166 -9.3 pure_floats filter_udf 179 161 -10.1 pure_strings identity_udf 636 609 -4.2 pure_strings sort_udf 894 799 -10.6 pure_strings filter_udf 762 650 -14.7 pure_ts identity_udf 267 211 -21.0 pure_ts sort_udf 303 230 -24.1 pure_ts filter_udf 273 233 -14.7 mixed_types identity_udf 571 435 -23.8 mixed_types sort_udf 668 518 -22.5 mixed_types filter_udf 507 460 -9.3 ``` ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
