Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22305#discussion_r232485435
  
    --- Diff: python/pyspark/worker.py ---
    @@ -154,6 +154,47 @@ def wrapped(*series):
         return lambda *a: (wrapped(*a), arrow_return_type)
     
     
    +def wrap_bounded_window_agg_pandas_udf(f, return_type):
    +    arrow_return_type = to_arrow_type(return_type)
    +
    +    def wrapped(begin_index, end_index, *series):
    +        import numpy as np
    +        import pandas as pd
    +        result = []
    +        for i in range(0, len(begin_index)):
    +            begin = begin_index[i]
    +            end = end_index[i]
    +            range_index = np.arange(begin, end)
    +            # Note: Create a slice from a series is actually pretty 
expensive to
    +            #       do for each window. However, there is no way to 
reduce/eliminate
    +            #       the cost of creating sub series here AFAIK.
    +            # TODO: s.take might be the best way to create sub series
    +            series_slices = [s.take(range_index) for s in series]
    +            result.append(f(*series_slices))
    +        return pd.Series(result)
    +
    +    return lambda *a: (wrapped(*a), arrow_return_type)
    +
    +
    +def wrap_bounded_window_agg_pandas_udf_np(f, return_type):
    --- End diff --
    
    Let's get rid of it then. Looks we're going to make it as a separate one.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to