zhengruifeng commented on code in PR #55532:
URL: https://github.com/apache/spark/pull/55532#discussion_r3146281041


##########
python/pyspark/worker.py:
##########
@@ -2561,8 +2558,8 @@ def func(split_index: int, data: 
Iterator[pa.RecordBatch]) -> Iterator[pa.Record
             output_batches = udf_func(input_batches)
 
             # Post-processing
-            verified: Iterator[pa.RecordBatch] = 
verify_result(pa.RecordBatch)(output_batches)
-            yield from map(ArrowBatchTransformer.wrap_struct, verified)
+            verified_iter = verify_return_type(output_batches, 
Iterator[pa.RecordBatch])

Review Comment:
   The behavior change called out in the PR description — "a UDF returning a 
non-`Iterator` iterable (e.g. list) is now rejected with `UDF_RETURN_TYPE`" — 
isn't pinned by a test. `test_other_than_recordbatch_iter` covers (a) returning 
a non-iterable (`int`) and (b) returning an iterator with wrong elements, but 
not (c) returning an iterable-but-not-iterator. Adding a case where the UDF 
returns `[pa.RecordBatch.from_pandas(...)]` (a list, not an iterator) and 
asserting it raises `UDF_RETURN_TYPE` would lock in the documented tightening 
against future regressions in either direction.



##########
python/pyspark/worker.py:
##########
@@ -2926,7 +2926,8 @@ def grouped_func(
 
                 # Verify, reorder, and wrap each output batch
                 for batch in result:
-                    verify_arrow_batch(
+                    verify_return_type(batch, pa.RecordBatch)

Review Comment:
   The PR description frames this as "aligning runtime with the documented 
`Iterator[...]` signatures", but `SQL_GROUPED_MAP_ARROW_ITER_UDF` here still 
iterates `result` as a generic iterable — only individual batches are 
type-checked. A grouped-iter UDF returning `[batch1, batch2]` (list) is now 
accepted, while a `mapInArrow` UDF returning the same shape is rejected, even 
though `typehints.py` declares both as `Iterator[pa.RecordBatch]`. Was the 
partial scope intentional? If consistency with the declared contract is the 
goal, wrapping `result` once with `verify_return_type(result, 
Iterator[pa.RecordBatch])` and iterating the returned wrapper would tighten 
this path the same way and let you drop the per-element 
`verify_return_type(batch, pa.RecordBatch)` inside the loop.



##########
python/pyspark/worker.py:
##########
@@ -234,46 +248,56 @@ def chain(f, g):
     return lambda *a: g(f(*a))
 
 
-def verify_result(expected_type: type) -> Callable[[Any], Iterator]:
-    """
-    Create a result verifier that checks both iterability and element types.
+@overload
+def verify_return_type(result: Any, expected_type: Type[T]) -> T: ...
 
-    Returns a function that takes a UDF result, verifies it is iterable,
-    and lazily type-checks each element via map.
 
-    Parameters
-    ----------
-    expected_type : type
-        The expected Python/PyArrow type for each element
-        (e.g. pa.RecordBatch, pa.Array).
+@overload
+def verify_return_type(result: Any, expected_type: Any) -> Any: ...
+
+
+def verify_return_type(result: Any, expected_type: Any) -> Any:
     """
+    Verify a UDF return value against an expected type.
 
-    package = getattr(inspect.getmodule(expected_type), "__package__", "")
-    label: str = f"{package}.{expected_type.__name__}"
+    Returns ``result`` unchanged if ``isinstance(result, expected_type)``.
+    For ``Iterator[T]``, returns a lazy iterator that checks each element
+    against ``T`` on consumption. Raises ``PySparkTypeError`` on mismatch.
+    """
+    if getattr(expected_type, "_name", None) == "Iterator":

Review Comment:
   Minor / forward-looking: `getattr(expected_type, "_name", None) == 
"Iterator"` matches `typing.Iterator[T]` but not `collections.abc.Iterator[T]` 
(PEP 585 form is a `types.GenericAlias` and has no `_name`). A future caller 
writing `from collections.abc import Iterator` would silently fall through to 
the concrete-type branch, then `isinstance(result, Iterator[T])` would raise 
`TypeError: isinstance() argument 2 cannot be a parameterized generic` — 
confusing relative to the actual mistake. `typing.get_origin(expected_type) is 
collections.abc.Iterator` handles both forms, or a one-line docstring caveat 
that the helper expects the `typing.X` form would suffice. All current callers 
use `from typing import Iterator`, so this is forward-looking only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to