[ 
https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng resolved SPARK-26412.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 3.0.0

Issue resolved by pull request 24643
[https://github.com/apache/spark/pull/24643]

> Allow Pandas UDF to take an iterator of pd.DataFrames
> -----------------------------------------------------
>
>                 Key: SPARK-26412
>                 URL: https://issues.apache.org/jira/browse/SPARK-26412
>             Project: Spark
>          Issue Type: New Feature
>          Components: PySpark
>    Affects Versions: 3.0.0
>            Reporter: Xiangrui Meng
>            Assignee: Weichen Xu
>            Priority: Major
>             Fix For: 3.0.0
>
>
> Pandas UDF is the ideal connection between PySpark and DL model inference 
> workload. However, user needs to load the model file first to make 
> predictions. It is common to see models of size ~100MB or bigger. If the 
> Pandas UDF execution is limited to each batch, user needs to repeatedly load 
> the same model for every batch in the same python worker process, which is 
> inefficient.
> We can provide users the iterator of batches in pd.DataFrame and let user 
> code handle it:
> {code}
> @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER)
> def predict(batch_iter):
>   model = ... # load model
>   for batch in batch_iter:
>     yield model.predict(batch)
> {code}
> The type of each batch is:
> * a pd.Series if UDF is called with a single non-struct-type column
> * a tuple of pd.Series if UDF is called with more than one Spark DF columns
> * a pd.DataFrame if UDF is called with a single StructType column
> Examples:
> {code}
> @pandas_udf(...)
> def evaluate(batch_iter):
>   model = ... # load model
>   for features, label in batch_iter:
>     pred = model.predict(features)
>     yield (pred - label).abs()
> df.select(evaluate(col("features"), col("label")).alias("err"))
> {code}
> {code}
> @pandas_udf(...)
> def evaluate(pdf_iter):
>   model = ... # load model
>   for pdf in pdf_iter:
>     pred = model.predict(pdf['x'])
>     yield (pred - pdf['y']).abs()
> df.select(evaluate(struct(col("features"), col("label"))).alias("err"))
> {code}
> If the UDF doesn't return the same number of records for the entire 
> partition, user should see an error. We don't restrict that every yield 
> should match the input batch size.
> Another benefit is with iterator interface and asyncio from Python, it is 
> flexible for users to implement data pipelining.
> cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to