Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222015287 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): + import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + + # Ensure return type of Pandas.Series matches the arrow return type of the user-defined + # function. Otherwise, we may produce incorrect serialized data. + # Note: for timestamp type, we only need to ensure both types are timestamp because the + # serializer will do conversion. + try: + arrow_type_of_result = pa.from_numpy_dtype(result.dtype) + both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ + pa.types.is_timestamp(arrow_return_type) + if not both_are_timestamp and arrow_return_type != arrow_type_of_result: + print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " --- End diff -- No, but as the other print usage in `worker.py`, I think this can be seen in the worker log? This is also useful when testing in pyspark shell.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org