zhengruifeng commented on code in PR #55532:
URL: https://github.com/apache/spark/pull/55532#discussion_r3146281041
##########
python/pyspark/worker.py:
##########
@@ -2561,8 +2558,8 @@ def func(split_index: int, data:
Iterator[pa.RecordBatch]) -> Iterator[pa.Record
output_batches = udf_func(input_batches)
# Post-processing
- verified: Iterator[pa.RecordBatch] =
verify_result(pa.RecordBatch)(output_batches)
- yield from map(ArrowBatchTransformer.wrap_struct, verified)
+ verified_iter = verify_return_type(output_batches,
Iterator[pa.RecordBatch])
Review Comment:
The behavior change called out in the PR description — "a UDF returning a
non-`Iterator` iterable (e.g. list) is now rejected with `UDF_RETURN_TYPE`" —
isn't pinned by a test. `test_other_than_recordbatch_iter` covers (a) returning
a non-iterable (`int`) and (b) returning an iterator with wrong elements, but
not (c) returning an iterable-but-not-iterator. Adding a case where the UDF
returns `[pa.RecordBatch.from_pandas(...)]` (a list, not an iterator) and
asserting it raises `UDF_RETURN_TYPE` would lock in the documented tightening
against future regressions in either direction.
##########
python/pyspark/worker.py:
##########
@@ -2926,7 +2926,8 @@ def grouped_func(
# Verify, reorder, and wrap each output batch
for batch in result:
- verify_arrow_batch(
+ verify_return_type(batch, pa.RecordBatch)
Review Comment:
The PR description frames this as "aligning runtime with the documented
`Iterator[...]` signatures", but `SQL_GROUPED_MAP_ARROW_ITER_UDF` here still
iterates `result` as a generic iterable — only individual batches are
type-checked. A grouped-iter UDF returning `[batch1, batch2]` (list) is now
accepted, while a `mapInArrow` UDF returning the same shape is rejected, even
though `typehints.py` declares both as `Iterator[pa.RecordBatch]`. Was the
partial scope intentional? If consistency with the declared contract is the
goal, wrapping `result` once with `verify_return_type(result,
Iterator[pa.RecordBatch])` and iterating the returned wrapper would tighten
this path the same way and let you drop the per-element
`verify_return_type(batch, pa.RecordBatch)` inside the loop.
##########
python/pyspark/worker.py:
##########
@@ -234,46 +248,56 @@ def chain(f, g):
return lambda *a: g(f(*a))
-def verify_result(expected_type: type) -> Callable[[Any], Iterator]:
- """
- Create a result verifier that checks both iterability and element types.
+@overload
+def verify_return_type(result: Any, expected_type: Type[T]) -> T: ...
- Returns a function that takes a UDF result, verifies it is iterable,
- and lazily type-checks each element via map.
- Parameters
- ----------
- expected_type : type
- The expected Python/PyArrow type for each element
- (e.g. pa.RecordBatch, pa.Array).
+@overload
+def verify_return_type(result: Any, expected_type: Any) -> Any: ...
+
+
+def verify_return_type(result: Any, expected_type: Any) -> Any:
"""
+ Verify a UDF return value against an expected type.
- package = getattr(inspect.getmodule(expected_type), "__package__", "")
- label: str = f"{package}.{expected_type.__name__}"
+ Returns ``result`` unchanged if ``isinstance(result, expected_type)``.
+ For ``Iterator[T]``, returns a lazy iterator that checks each element
+ against ``T`` on consumption. Raises ``PySparkTypeError`` on mismatch.
+ """
+ if getattr(expected_type, "_name", None) == "Iterator":
Review Comment:
Minor / forward-looking: `getattr(expected_type, "_name", None) ==
"Iterator"` matches `typing.Iterator[T]` but not `collections.abc.Iterator[T]`
(PEP 585 form is a `types.GenericAlias` and has no `_name`). A future caller
writing `from collections.abc import Iterator` would silently fall through to
the concrete-type branch, then `isinstance(result, Iterator[T])` would raise
`TypeError: isinstance() argument 2 cannot be a parameterized generic` —
confusing relative to the actual mistake. `typing.get_origin(expected_type) is
collections.abc.Iterator` handles both forms, or a one-line docstring caveat
that the helper expects the `typing.X` form would suffice. All current callers
use `from typing import Iterator`, so this is forward-looking only.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]