Yicong-Huang opened a new pull request, #55756:
URL: https://github.com/apache/spark/pull/55756
### What changes were proposed in this pull request?
Refactor `SQL_SCALAR_PANDAS_ITER_UDF` to use `ArrowStreamSerializer` as pure
I/O, moving the iterator scalar Pandas UDF logic from
`ArrowStreamPandasUDFSerializer` into `read_udfs()` in `worker.py`.
Specifically:
- Drop `wrap_pandas_batch_iter_udf` for the scalar iter path (still used by
`SQL_MAP_PANDAS_ITER_UDF`).
- Route `SQL_SCALAR_PANDAS_ITER_UDF` through
`ArrowStreamSerializer(write_start_stream=True)`.
- In `read_udfs()`, add a self-contained handler that:
- Streams Arrow `RecordBatch` -> pandas Series via
`ArrowBatchTransformer.to_pandas()` (`struct_in_pandas="dict"`,
`df_for_struct=True`, `ndarray_as_list=False`), one element per input batch.
- Invokes the user iterator UDF with the per-batch pandas args iterator
and validates the returned iterable.
- For each yielded element: validates it is `pandas.DataFrame` (struct
return type) or `pandas.Series` (otherwise), runs `verify_pandas_result`
(column names / counts), and converts back to an Arrow `RecordBatch` via
`PandasToArrowConversion.convert()`.
- Reuses the iterator helpers introduced for `SQL_SCALAR_ARROW_ITER_UDF`:
`verify_output_row_limit` (fail-fast `OUTPUT_EXCEEDS_INPUT_ROWS`),
`verify_output_row_count` (final `RESULT_ROWS_MISMATCH`), and
`verify_iterator_exhausted` (`INPUT_NOT_FULLY_CONSUMED`).
- Split the trailing combined `is_scalar_iter or is_map_pandas_iter` block;
`SQL_MAP_PANDAS_ITER_UDF` now lives in its own slim block (no row-count guards,
since they only applied to scalar iter).
### Why are the changes needed?
Part of [SPARK-55388](https://issues.apache.org/jira/browse/SPARK-55388).
This consolidates UDF dispatch, conversion, and verification logic for
`SQL_SCALAR_PANDAS_ITER_UDF` into a single inline handler in `read_udfs()`,
mirroring the pattern already applied to `SQL_SCALAR_PANDAS_UDF`
([SPARK-56648](https://issues.apache.org/jira/browse/SPARK-56648)) and
`SQL_SCALAR_ARROW_ITER_UDF`
([SPARK-55577](https://issues.apache.org/jira/browse/SPARK-55577)). The
dedicated `ArrowStreamPandasUDFSerializer` is no longer used by the scalar
pandas iter path, reducing indirection and bringing the eval-type processing
paths closer to a uniform structure.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests. No behavior change.
`pyspark.sql.tests.pandas.test_pandas_udf_scalar` plus `test_pandas_udf`,
`test_pandas_udf_typehints`, `test_pandas_udf_window`, `test_arrow_python_udf`,
and `test_pandas_map` (covering the split MAP_PANDAS_ITER block) all pass.
ASV benchmark comparison via `COLUMNS=120 asv run --python=same --bench
"ScalarPandasIterUDF" -a repeat=3`. before = `upstream/master`, after = this PR.
**ScalarPandasIterUDFTimeBench** (latency):
```text
scenario udf before after diff
------------------- -------------- ------- ------- -------
sm_batch_few_col identity_udf 391ms 377ms -3.6%
sm_batch_few_col sort_udf 537ms 519ms -3.2%
sm_batch_few_col nullcheck_udf 460ms 449ms -2.5%
sm_batch_many_col identity_udf 338ms 302ms -10.8%
sm_batch_many_col sort_udf 353ms 328ms -7.2%
sm_batch_many_col nullcheck_udf 337ms 318ms -5.6%
lg_batch_few_col identity_udf 1.21s 1.15s -5.0%
lg_batch_few_col sort_udf 1.48s 1.42s -4.0%
lg_batch_few_col nullcheck_udf 1.35s 1.22s -9.0%
lg_batch_many_col identity_udf 1.42s 1.34s -5.7%
lg_batch_many_col sort_udf 1.46s 1.36s -7.0%
lg_batch_many_col nullcheck_udf 1.43s 1.34s -6.5%
pure_ints identity_udf 203ms 193ms -4.9%
pure_ints sort_udf 291ms 274ms -5.8%
pure_ints nullcheck_udf 235ms 223ms -5.4%
pure_floats identity_udf 202ms 192ms -4.8%
pure_floats sort_udf 297ms 282ms -5.0%
pure_floats nullcheck_udf 235ms 222ms -5.6%
pure_strings identity_udf 1.38s 1.31s -5.1%
pure_strings sort_udf 1.96s 1.73s -11.5%
pure_strings nullcheck_udf 1.35s 1.15s -14.7%
pure_ts identity_udf 381ms 357ms -6.3%
pure_ts sort_udf 457ms 427ms -6.4%
pure_ts nullcheck_udf 408ms 380ms -6.7%
mixed_types identity_udf 713ms 673ms -5.6%
mixed_types sort_udf 828ms 747ms -9.8%
mixed_types nullcheck_udf 755ms 709ms -6.0%
```
**ScalarPandasIterUDFPeakmemBench** (peak memory):
```text
scenario udf before after diff
------------------- -------------- ------- ------- -------
sm_batch_few_col identity_udf 459M 459M -0.0%
sm_batch_few_col sort_udf 460M 460M -0.0%
sm_batch_few_col nullcheck_udf 457M 457M -0.0%
sm_batch_many_col identity_udf 459M 459M -0.0%
sm_batch_many_col sort_udf 460M 460M -0.0%
sm_batch_many_col nullcheck_udf 459M 459M -0.0%
lg_batch_few_col identity_udf 591M 591M +0.1%
lg_batch_few_col sort_udf 588M 589M +0.2%
lg_batch_few_col nullcheck_udf 574M 574M -0.0%
lg_batch_many_col identity_udf 598M 598M -0.0%
lg_batch_many_col sort_udf 600M 599M -0.0%
lg_batch_many_col nullcheck_udf 599M 599M +0.0%
pure_ints identity_udf 521M 521M -0.0%
pure_ints sort_udf 522M 522M -0.0%
pure_ints nullcheck_udf 519M 519M +0.0%
pure_floats identity_udf 518M 518M +0.0%
pure_floats sort_udf 519M 520M +0.0%
pure_floats nullcheck_udf 519M 519M +0.0%
pure_strings identity_udf 538M 537M -0.1%
pure_strings sort_udf 539M 538M -0.1%
pure_strings nullcheck_udf 535M 535M -0.1%
pure_ts identity_udf 521M 521M +0.0%
pure_ts sort_udf 522M 522M +0.0%
pure_ts nullcheck_udf 521M 522M +0.0%
mixed_types identity_udf 501M 501M -0.0%
mixed_types sort_udf 502M 501M -0.0%
mixed_types nullcheck_udf 501M 501M -0.0%
```
**Summary**: latency improves -2.5% to -14.7% across all 27 (scenario, udf)
combos (likely from dropped indirection and removal of the per-batch `(result,
return_type)` tuple yielding); peak memory is flat (within +/-0.2%).
### Was this patch authored or co-authored using generative AI tooling?
No
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]