Yicong-Huang opened a new pull request, #55527:
URL: https://github.com/apache/spark/pull/55527

   ### What changes were proposed in this pull request?
   
   Follow-up to SPARK-55726. Fixes the mock wire-protocol writer in 
`python/benchmarks/bench_eval_type.py` so that all batches of a single 
DataFrame are written as one Arrow IPC stream, matching the real worker wire 
protocol where a large group may be split across multiple RecordBatches by 
``spark.sql.execution.arrow.maxRecordsPerBatch`` but still arrives in one 
stream.
   
   Concretely:
   - `MockProtocolWriter.write_grouped_data_payload` now infers `num_dfs` from 
the group tuple length and writes each DataFrame's batches into a single Arrow 
IPC stream.
   - `MockDataFactory.make_grouped_batches` now returns 
`list[tuple[list[pa.RecordBatch]]]` (each group is a 1-tuple; the tuple element 
is the list of batches for that group's DataFrame).
   - `MockDataFactory.make_cogrouped_batches` now returns 
`list[tuple[list[pa.RecordBatch], list[pa.RecordBatch]]]` (a 2-tuple of 
left/right batches lists).
   - All callers updated: dropped the now-redundant `num_dfs=` argument and 
adjusted the one `groups[0][0]` access that reached into a RecordBatch to 
`groups[0][0][0]`.
   
   ### Why are the changes needed?
   
   The previous writer walked each group tuple and wrote every batch as its own 
Arrow IPC stream, while declaring `num_dfs=1` upfront. That worked when every 
group happened to have exactly one batch, but failed as soon as a group was 
split by the batch-size cap. In particular, the `lg_grp_few_col` / 
`lg_grp_many_col` scenarios in `GroupedMapPandasUDF{Time,Peakmem}Bench` (100K 
rows per group, default `MAX_RECORDS_PER_BATCH=10_000` → 10 batches/group) 
throw:
   
   ```
   pyspark.errors.exceptions.base.PySparkValueError:
     [INVALID_NUMBER_OF_DATAFRAMES_IN_GROUP] Invalid number of dataframes in 
group 1208025088.
   ```
   
   because the worker reads the next stream's bytes as the next group's 
`num_dfs` int. Other grouped/cogrouped benchmarks only avoided the bug by 
forcing `batch_size=rows_per_group`, but that also hid the real 
multi-batch-per-group wire shape. After this change the mock writer matches 
what worker.py actually expects, and `lg_grp_*` scenarios run green.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. Python microbenchmarks only.
   
   ### How was this patch tested?
   
   Ran all 5 `_GroupedMapPandasBenchMixin` scenarios across all 3 UDFs (15 
combinations) locally against `upstream/master` + this patch — all pass. 
Smoke-tested one scenario for every other grouped/cogrouped bench class 
(Cogrouped, GroupedAgg, GroupedAggIter, GroupedMapArrow, GroupedMapArrowIter, 
WindowAgg) — all pass.
   
   Before this fix, on `upstream/master` 
`GroupedMapPandasUDFTimeBench.time_worker("lg_grp_few_col", "identity_udf")` 
raises `INVALID_NUMBER_OF_DATAFRAMES_IN_GROUP`; after this fix it runs.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to