[ 
https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng updated SPARK-26412:
----------------------------------
    Description: 
Pandas UDF is the ideal connection between PySpark and DL model inference 
workload. However, user needs to load the model file first to make predictions. 
It is common to see models of size ~100MB or bigger. If the Pandas UDF 
execution is limited to each batch, user needs to repeatedly load the same 
model for every batch in the same python worker process, which is inefficient.

We can provide users the iterator of batches in pd.DataFrame and let user code 
handle it:

{code}
@pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITERATOR)
def predict(batch_iter):
  model = ... # load model
  for batch in batch_iter:
    yield model.predict(batch)
{code}

We might add a contract that each yield must match the corresponding batch size.

Another benefit is with iterator interface and asyncio from Python, it is 
flexible for users to implement data pipelining.

cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator]

  was:
Pandas UDF is the ideal connection between PySpark and DL model inference 
workload. However, user needs to load the model file first to make predictions. 
It is common to see models of size ~100MB or bigger. If the Pandas UDF 
execution is limited to each batch, user needs to repeatedly load the same 
model for every batch in the same python worker process, which is inefficient.

We can provide users the iterator of batches in pd.DataFrame and let user code 
handle it:

{code}
@pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITERATOR)
def predict(batch_iter):
  model = ... # load model
  for batch in batch_iter:
    yield model.predict(batch)
{code}

Another benefit is with iterator interface and asyncio from Python, it is 
flexible for users to implement data pipelining.

cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator]


> Allow Pandas UDF to take an iterator of pd.DataFrames
> -----------------------------------------------------
>
>                 Key: SPARK-26412
>                 URL: https://issues.apache.org/jira/browse/SPARK-26412
>             Project: Spark
>          Issue Type: New Feature
>          Components: PySpark
>    Affects Versions: 3.0.0
>            Reporter: Xiangrui Meng
>            Assignee: Weichen Xu
>            Priority: Major
>
> Pandas UDF is the ideal connection between PySpark and DL model inference 
> workload. However, user needs to load the model file first to make 
> predictions. It is common to see models of size ~100MB or bigger. If the 
> Pandas UDF execution is limited to each batch, user needs to repeatedly load 
> the same model for every batch in the same python worker process, which is 
> inefficient.
> We can provide users the iterator of batches in pd.DataFrame and let user 
> code handle it:
> {code}
> @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITERATOR)
> def predict(batch_iter):
>   model = ... # load model
>   for batch in batch_iter:
>     yield model.predict(batch)
> {code}
> We might add a contract that each yield must match the corresponding batch 
> size.
> Another benefit is with iterator interface and asyncio from Python, it is 
> flexible for users to implement data pipelining.
> cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to