Yicong-Huang opened a new pull request, #55961:
URL: https://github.com/apache/spark/pull/55961
### What changes were proposed in this pull request?
Forward `prefers_large_types=runner_conf.use_large_var_types` when
constructing `expected_cols_and_types` in `python/pyspark/worker.py` for the
three Arrow map eval types:
- `SQL_GROUPED_MAP_ARROW_UDF`
- `SQL_GROUPED_MAP_ARROW_ITER_UDF`
- `SQL_COGROUPED_MAP_ARROW_UDF`
Each eval type built `arrow_return_type` with the `prefers_large_types` flag
forwarded but, immediately below, built the per-field `expected_cols_and_types`
schema (used by `verify_arrow_result` to validate the returned table) by
calling `to_arrow_type(col.dataType, timezone="UTC")` per field, omitting the
same flag. Adds the missing keyword to all six sites (name-based + positional)
across the three branches.
### Why are the changes needed?
When `spark.sql.execution.arrow.useLargeVarTypes=true`, fields of type
`StringType`/`BinaryType` are produced as Arrow `large_string`/`large_binary`
in the result table (because `arrow_return_type` correctly reflects that). But
the parallel `expected_cols_and_types` schema, built without the flag, still
expects regular `string`/`binary`. `verify_arrow_result` then raises a spurious
`RESULT_COLUMN_TYPES_MISMATCH`:
```
[RESULT_COLUMN_TYPES_MISMATCH] Column types of the returned data do not match
specified schema. Mismatch: column 's' (expected string, actual
large_string),
column 'b' (expected binary, actual large_binary).
```
A simple repro on master (without this fix):
```python
spark.conf.set("spark.sql.execution.arrow.useLargeVarTypes", True)
df = spark.createDataFrame([(0, "foo", b"foo")], "id long, s string, b
binary")
df.groupBy("id").applyInArrow(lambda t: t, "id long, s string, b
binary").collect()
# pyspark.errors.exceptions.base.PySparkRuntimeError:
# [RESULT_COLUMN_TYPES_MISMATCH] ...
```
This is a pre-requisite for SPARK-56608 (migrating `verify_arrow_result`
checks into `enforce_schema`), since that work depends on the expected/actual
Arrow schemas matching exactly under the large-var-types config.
### Does this PR introduce _any_ user-facing change?
Yes (bug fix). `applyInArrow` (grouped and cogrouped, both iterator and
non-iterator variants) no longer raises a spurious
`RESULT_COLUMN_TYPES_MISMATCH` when
`spark.sql.execution.arrow.useLargeVarTypes=true` and the UDF returns a table
containing `StringType`/`BinaryType` columns. Behavior under the default
`useLargeVarTypes=false` is unchanged.
### How was this patch tested?
Added `test_apply_in_arrow_large_var_types` to both
`test_arrow_grouped_map.py` (covers `SQL_GROUPED_MAP_ARROW_UDF` and
`SQL_GROUPED_MAP_ARROW_ITER_UDF` via the existing `function_variations` helper)
and `test_arrow_cogrouped_map.py` (covers `SQL_COGROUPED_MAP_ARROW_UDF`). Each
subtest exercises both name-based and positional column assignment. The tests
assert that the UDF receives `large_string`/`large_binary` types and that the
round-trip equals the input via `assertDataFrameEqual`. The Spark Connect
parity test files automatically pick up the new tests through
`ApplyInArrowTestsMixin` / `CogroupedMapInArrowTestsMixin`.
Verified the bug exists on `master` by stashing the worker.py fix and
re-running the new tests: both fail with the expected
`RESULT_COLUMN_TYPES_MISMATCH`. With the fix applied, both pass.
### Was this patch authored or co-authored using generative AI tooling?
No
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]