Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232485435 --- Diff: python/pyspark/worker.py --- @@ -154,6 +154,47 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_bounded_window_agg_pandas_udf(f, return_type): + arrow_return_type = to_arrow_type(return_type) + + def wrapped(begin_index, end_index, *series): + import numpy as np + import pandas as pd + result = [] + for i in range(0, len(begin_index)): + begin = begin_index[i] + end = end_index[i] + range_index = np.arange(begin, end) + # Note: Create a slice from a series is actually pretty expensive to + # do for each window. However, there is no way to reduce/eliminate + # the cost of creating sub series here AFAIK. + # TODO: s.take might be the best way to create sub series + series_slices = [s.take(range_index) for s in series] + result.append(f(*series_slices)) + return pd.Series(result) + + return lambda *a: (wrapped(*a), arrow_return_type) + + +def wrap_bounded_window_agg_pandas_udf_np(f, return_type): --- End diff -- Let's get rid of it then. Looks we're going to make it as a separate one.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org