Yicong-Huang opened a new pull request, #55527:
URL: https://github.com/apache/spark/pull/55527
### What changes were proposed in this pull request?
Follow-up to SPARK-55726. Fixes the mock wire-protocol writer in
`python/benchmarks/bench_eval_type.py` so that all batches of a single
DataFrame are written as one Arrow IPC stream, matching the real worker wire
protocol where a large group may be split across multiple RecordBatches by
``spark.sql.execution.arrow.maxRecordsPerBatch`` but still arrives in one
stream.
Concretely:
- `MockProtocolWriter.write_grouped_data_payload` now infers `num_dfs` from
the group tuple length and writes each DataFrame's batches into a single Arrow
IPC stream.
- `MockDataFactory.make_grouped_batches` now returns
`list[tuple[list[pa.RecordBatch]]]` (each group is a 1-tuple; the tuple element
is the list of batches for that group's DataFrame).
- `MockDataFactory.make_cogrouped_batches` now returns
`list[tuple[list[pa.RecordBatch], list[pa.RecordBatch]]]` (a 2-tuple of
left/right batches lists).
- All callers updated: dropped the now-redundant `num_dfs=` argument and
adjusted the one `groups[0][0]` access that reached into a RecordBatch to
`groups[0][0][0]`.
### Why are the changes needed?
The previous writer walked each group tuple and wrote every batch as its own
Arrow IPC stream, while declaring `num_dfs=1` upfront. That worked when every
group happened to have exactly one batch, but failed as soon as a group was
split by the batch-size cap. In particular, the `lg_grp_few_col` /
`lg_grp_many_col` scenarios in `GroupedMapPandasUDF{Time,Peakmem}Bench` (100K
rows per group, default `MAX_RECORDS_PER_BATCH=10_000` → 10 batches/group)
throw:
```
pyspark.errors.exceptions.base.PySparkValueError:
[INVALID_NUMBER_OF_DATAFRAMES_IN_GROUP] Invalid number of dataframes in
group 1208025088.
```
because the worker reads the next stream's bytes as the next group's
`num_dfs` int. Other grouped/cogrouped benchmarks only avoided the bug by
forcing `batch_size=rows_per_group`, but that also hid the real
multi-batch-per-group wire shape. After this change the mock writer matches
what worker.py actually expects, and `lg_grp_*` scenarios run green.
### Does this PR introduce _any_ user-facing change?
No. Python microbenchmarks only.
### How was this patch tested?
Ran all 5 `_GroupedMapPandasBenchMixin` scenarios across all 3 UDFs (15
combinations) locally against `upstream/master` + this patch — all pass.
Smoke-tested one scenario for every other grouped/cogrouped bench class
(Cogrouped, GroupedAgg, GroupedAggIter, GroupedMapArrow, GroupedMapArrowIter,
WindowAgg) — all pass.
Before this fix, on `upstream/master`
`GroupedMapPandasUDFTimeBench.time_worker("lg_grp_few_col", "identity_udf")`
raises `INVALID_NUMBER_OF_DATAFRAMES_IN_GROUP`; after this fix it runs.
### Was this patch authored or co-authored using generative AI tooling?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]