Michael Tong created SPARK-42645:
------------------------------------

             Summary: Introduce feature to allow for function caching across 
input rows.
                 Key: SPARK-42645
                 URL: https://issues.apache.org/jira/browse/SPARK-42645
             Project: Spark
          Issue Type: Wish
          Components: Optimizer
    Affects Versions: 3.3.2
            Reporter: Michael Tong


Introduce the ability to make functions cachable across input rows. I'm 
imagining this function to work similarly to python's 
[functools.cache|https://docs.python.org/3/library/functools.html#functools.cache]
 where you could add a decorator to certain expensive functions that you know 
will regularly encounter repeated values as you read the input data.

 

With this new feature you would be able to significantly speed up many real 
world jobs that use expensive functions on data that naturally has repeated 
column values. An example of this would be parsing user agent fields from 
internet traffic logs partitioned by user id. Even though the data is not 
sorted by user agent, in a sample of 10k continuous rows there would be much 
less than 10k unique values because popular user agents exist on a large 
fraction of traffic and the user agent of the first event from a user is likely 
to be shared among all subsequent events from that user. Currently there is a 
way to hack an approximation of this in a python implementation of this via 
pandas_udfs. This works because pandas_udfs by default read in batches of 10k 
input rows, so you can used a caching UDF that empties every 10k rows. At my 
current job I have noticed that some applications of this trick can 
significantly speed up queries where custom UDFs are the bottleneck in a query. 
An example of this is

 
{code:java}
@F.pandas_udf(T.StringType())
def parse_user_agent_field(user_agent_series):
    @functools.cache
    def parse_user_agent_field_helper(user_agent):
        # parse the user agent and return the relevant field
        return None
    return user_agent_series.apply(parse_user_agent_field_helper){code}
 

 

It would be nice if there was some official support for this behavior for both 
built in functions and UDFs. If there was official support for this I'd imagine 
it to look something like

 
{code:java}
# using pyspark dataframe API
df = df.withColumn(output_col, F.cache(F.function)(input_col)){code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to