[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046958#comment-17046958 ] Jorge Machado commented on SPARK-26412: --- Thanks for the Tipp. It helps > Allow Pandas UDF to take an iterator of pd.DataFrames > - > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > Fix For: 3.0.0 > > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to each batch, user needs to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. > We can provide users the iterator of batches in pd.DataFrame and let user > code handle it: > {code} > @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER) > def predict(batch_iter): > model = ... # load model > for batch in batch_iter: > yield model.predict(batch) > {code} > The type of each batch is: > * a pd.Series if UDF is called with a single non-struct-type column > * a tuple of pd.Series if UDF is called with more than one Spark DF columns > * a pd.DataFrame if UDF is called with a single StructType column > Examples: > {code} > @pandas_udf(...) > def evaluate(batch_iter): > model = ... # load model > for features, label in batch_iter: > pred = model.predict(features) > yield (pred - label).abs() > df.select(evaluate(col("features"), col("label")).alias("err")) > {code} > {code} > @pandas_udf(...) > def evaluate(pdf_iter): > model = ... # load model > for pdf in pdf_iter: > pred = model.predict(pdf['x']) > yield (pred - pdf['y']).abs() > df.select(evaluate(struct(col("features"), col("label"))).alias("err")) > {code} > If the UDF doesn't return the same number of records for the entire > partition, user should see an error. We don't restrict that every yield > should match the input batch size. > Another benefit is with iterator interface and asyncio from Python, it is > flexible for users to implement data pipelining. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046542#comment-17046542 ] Hyukjin Kwon commented on SPARK-26412: -- You cannot separate one iterator to multiple iterators since iterator is supposed to be consumed once. Python doesn't support this way. You should do something like {code} class SomeClass(): def __init__(a,b,c): pass def map_func(batch_iter): for a, b, c in batch_iter dataset = SomeClass(a, b, c) {code} You can just pass strings and do {{json.loads}} which is pretty easy. There is no standard type for JSON in Spark which isn't ANSI standard. Adding new types is a huge job because you should think about how to ser/de in Scala, R, Java for instance. > Allow Pandas UDF to take an iterator of pd.DataFrames > - > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > Fix For: 3.0.0 > > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to each batch, user needs to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. > We can provide users the iterator of batches in pd.DataFrame and let user > code handle it: > {code} > @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER) > def predict(batch_iter): > model = ... # load model > for batch in batch_iter: > yield model.predict(batch) > {code} > The type of each batch is: > * a pd.Series if UDF is called with a single non-struct-type column > * a tuple of pd.Series if UDF is called with more than one Spark DF columns > * a pd.DataFrame if UDF is called with a single StructType column > Examples: > {code} > @pandas_udf(...) > def evaluate(batch_iter): > model = ... # load model > for features, label in batch_iter: > pred = model.predict(features) > yield (pred - label).abs() > df.select(evaluate(col("features"), col("label")).alias("err")) > {code} > {code} > @pandas_udf(...) > def evaluate(pdf_iter): > model = ... # load model > for pdf in pdf_iter: > pred = model.predict(pdf['x']) > yield (pred - pdf['y']).abs() > df.select(evaluate(struct(col("features"), col("label"))).alias("err")) > {code} > If the UDF doesn't return the same number of records for the entire > partition, user should see an error. We don't restrict that every yield > should match the input batch size. > Another benefit is with iterator interface and asyncio from Python, it is > flexible for users to implement data pipelining. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046373#comment-17046373 ] Jorge Machado commented on SPARK-26412: --- Well I was thinking on something more. like I would like to give a,b,c to another object. Like {code:java} class SomeClass(): def __init__(a,b,c): pass def map_func(batch_iter): dataset = SomeClass(batch_iter[0], batch_iter[1], batch_iter[2]) <- this does not work. {code} and another thing, it would be great if we could just yield a json for example instead of this fixed types > Allow Pandas UDF to take an iterator of pd.DataFrames > - > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > Fix For: 3.0.0 > > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to each batch, user needs to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. > We can provide users the iterator of batches in pd.DataFrame and let user > code handle it: > {code} > @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER) > def predict(batch_iter): > model = ... # load model > for batch in batch_iter: > yield model.predict(batch) > {code} > The type of each batch is: > * a pd.Series if UDF is called with a single non-struct-type column > * a tuple of pd.Series if UDF is called with more than one Spark DF columns > * a pd.DataFrame if UDF is called with a single StructType column > Examples: > {code} > @pandas_udf(...) > def evaluate(batch_iter): > model = ... # load model > for features, label in batch_iter: > pred = model.predict(features) > yield (pred - label).abs() > df.select(evaluate(col("features"), col("label")).alias("err")) > {code} > {code} > @pandas_udf(...) > def evaluate(pdf_iter): > model = ... # load model > for pdf in pdf_iter: > pred = model.predict(pdf['x']) > yield (pred - pdf['y']).abs() > df.select(evaluate(struct(col("features"), col("label"))).alias("err")) > {code} > If the UDF doesn't return the same number of records for the entire > partition, user should see an error. We don't restrict that every yield > should match the input batch size. > Another benefit is with iterator interface and asyncio from Python, it is > flexible for users to implement data pipelining. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046365#comment-17046365 ] Hyukjin Kwon commented on SPARK-26412: -- You can do it via: {code} def map_func(batch_iter): for a, b, c in batch_iter: yield a, b, c {code} > Allow Pandas UDF to take an iterator of pd.DataFrames > - > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > Fix For: 3.0.0 > > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to each batch, user needs to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. > We can provide users the iterator of batches in pd.DataFrame and let user > code handle it: > {code} > @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER) > def predict(batch_iter): > model = ... # load model > for batch in batch_iter: > yield model.predict(batch) > {code} > The type of each batch is: > * a pd.Series if UDF is called with a single non-struct-type column > * a tuple of pd.Series if UDF is called with more than one Spark DF columns > * a pd.DataFrame if UDF is called with a single StructType column > Examples: > {code} > @pandas_udf(...) > def evaluate(batch_iter): > model = ... # load model > for features, label in batch_iter: > pred = model.predict(features) > yield (pred - label).abs() > df.select(evaluate(col("features"), col("label")).alias("err")) > {code} > {code} > @pandas_udf(...) > def evaluate(pdf_iter): > model = ... # load model > for pdf in pdf_iter: > pred = model.predict(pdf['x']) > yield (pred - pdf['y']).abs() > df.select(evaluate(struct(col("features"), col("label"))).alias("err")) > {code} > If the UDF doesn't return the same number of records for the entire > partition, user should see an error. We don't restrict that every yield > should match the input batch size. > Another benefit is with iterator interface and asyncio from Python, it is > flexible for users to implement data pipelining. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046265#comment-17046265 ] Jorge Machado commented on SPARK-26412: --- Hi, one question. when using "a tuple of pd.Series if UDF is called with more than one Spark DF columns" how can I get the Series into a variables. like a, b, c = iterator ? map seems not to be a python tuple ... > Allow Pandas UDF to take an iterator of pd.DataFrames > - > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > Fix For: 3.0.0 > > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to each batch, user needs to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. > We can provide users the iterator of batches in pd.DataFrame and let user > code handle it: > {code} > @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER) > def predict(batch_iter): > model = ... # load model > for batch in batch_iter: > yield model.predict(batch) > {code} > The type of each batch is: > * a pd.Series if UDF is called with a single non-struct-type column > * a tuple of pd.Series if UDF is called with more than one Spark DF columns > * a pd.DataFrame if UDF is called with a single StructType column > Examples: > {code} > @pandas_udf(...) > def evaluate(batch_iter): > model = ... # load model > for features, label in batch_iter: > pred = model.predict(features) > yield (pred - label).abs() > df.select(evaluate(col("features"), col("label")).alias("err")) > {code} > {code} > @pandas_udf(...) > def evaluate(pdf_iter): > model = ... # load model > for pdf in pdf_iter: > pred = model.predict(pdf['x']) > yield (pred - pdf['y']).abs() > df.select(evaluate(struct(col("features"), col("label"))).alias("err")) > {code} > If the UDF doesn't return the same number of records for the entire > partition, user should see an error. We don't restrict that every yield > should match the input batch size. > Another benefit is with iterator interface and asyncio from Python, it is > flexible for users to implement data pipelining. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16867954#comment-16867954 ] Terry Kim commented on SPARK-26412: --- [~WeichenXu123] and [~mengxr] do you plan to do something similar for grouped_map and grouped_agg? I am happy to investigate/work on it if this is not in your roadmap. This is to address the OOM issue. > Allow Pandas UDF to take an iterator of pd.DataFrames > - > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > Fix For: 3.0.0 > > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to each batch, user needs to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. > We can provide users the iterator of batches in pd.DataFrame and let user > code handle it: > {code} > @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER) > def predict(batch_iter): > model = ... # load model > for batch in batch_iter: > yield model.predict(batch) > {code} > The type of each batch is: > * a pd.Series if UDF is called with a single non-struct-type column > * a tuple of pd.Series if UDF is called with more than one Spark DF columns > * a pd.DataFrame if UDF is called with a single StructType column > Examples: > {code} > @pandas_udf(...) > def evaluate(batch_iter): > model = ... # load model > for features, label in batch_iter: > pred = model.predict(features) > yield (pred - label).abs() > df.select(evaluate(col("features"), col("label")).alias("err")) > {code} > {code} > @pandas_udf(...) > def evaluate(pdf_iter): > model = ... # load model > for pdf in pdf_iter: > pred = model.predict(pdf['x']) > yield (pred - pdf['y']).abs() > df.select(evaluate(struct(col("features"), col("label"))).alias("err")) > {code} > If the UDF doesn't return the same number of records for the entire > partition, user should see an error. We don't restrict that every yield > should match the input batch size. > Another benefit is with iterator interface and asyncio from Python, it is > flexible for users to implement data pipelining. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837353#comment-16837353 ] Xiangrui Meng commented on SPARK-26412: --- [~WeichenXu123] I updated the description. > Allow Pandas UDF to take an iterator of pd.DataFrames > - > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to each batch, user needs to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. > We can provide users the iterator of batches in pd.DataFrame and let user > code handle it: > {code} > @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITERATOR) > def predict(batch_iter): > model = ... # load model > for batch in batch_iter: > yield model.predict(batch) > {code} > Another benefit is with iterator interface and asyncio from Python, it is > flexible for users to implement data pipelining. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames or Arrow batches
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836480#comment-16836480 ] Weichen Xu commented on SPARK-26412: Discuss with [~mengxr] , discard proposal (2), this should be something like (user perspective): {code:java} def mapper_func(batch_iter): # do some initialization for batch in batch_iter: # do some computation on `batch` (a pandas dataframe) # and generate `result` (also a pandas dataframe) yield result df.mapPartitions(mapper_func){code} > Allow Pandas UDF to take an iterator of pd.DataFrames or Arrow batches > -- > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to batch scope, user need to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. I created this JIRA to discuss possible solutions. > Essentially we need to support "start()" and "finish()" besides "apply". We > can either provide those interfaces or simply provide users the iterator of > batches in pd.DataFrame or Arrow table and let user code handle it. > Another benefit is with iterator interface and asyncio from Python, it is > flexible for users to implement data pipelining. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames or Arrow batches
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836377#comment-16836377 ] Weichen Xu commented on SPARK-26412: [~mengxr] There's one issue: There're 2 proposals in the JIRA [SPARK-26412] Proposal (1) Simply provide users the iterator of batches in pd.DataFrame or Arrow table and let user code handle it. Like: {code:java} @pandas_udf(StringType(), PandasUDFType.SCALAR_ITERATOR) def udf1(batches): // do some initialization for batch in batches: # run some code on each row data and get computation result yield result{code} Proposal (2) For scalar pandas UDF, support "start()" and "finish()" besides "apply". But the scalar pandas UDF need to support computing multiple UDF for one row at the same time. see [https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L287] Such as: df.select(f0(df.col0), f1(df.col1, df.col2), f2(df.col3)) So the proposal (1) cannot work, because the "start" and "apply" and "finish" logic are mixed in one function and it cannot be split. But what we need is: 1) run "start" method for each udf f0, f1, f2 (if required) 2) for each row coming from iterator, run f0, f1, f2 apply method on the row. 3) run "finish" method for each udf f0, f1, f2 (if required) So only the proposal (2) works. > Allow Pandas UDF to take an iterator of pd.DataFrames or Arrow batches > -- > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Weichen Xu >Priority: Major > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to batch scope, user need to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. I created this JIRA to discuss possible solutions. > Essentially we need to support "start()" and "finish()" besides "apply". We > can either provide those interfaces or simply provide users the iterator of > batches in pd.DataFrame or Arrow table and let user code handle it. > Another benefit is with iterator interface and asyncio from Python, it is > flexible for users to implement data pipelining. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822521#comment-16822521 ] Xiangrui Meng commented on SPARK-26412: --- [~bryanc] It handles the data exchange for DL model inference use cases, not much for training. > Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition > -- > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Priority: Major > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to batch scope, user need to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. I created this JIRA to discuss possible solutions. > Essentially we need to support "start()" and "finish()" besides "apply". We > can either provide those interfaces or simply provide users the iterator of > batches in pd.DataFrame and let user code handle it. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16751785#comment-16751785 ] Bryan Cutler commented on SPARK-26412: -- [~mengxr] I think Arrow record batches would be a much more ideal way to connect with other frameworks. Making the conversion to Pandas carries some overhead and the Arrow format/types are more solidly defined. It is also better suited to be used with an iterator - most of the Arrow IPC mechanisms operate on streams of record batches. Is this proposal instead of SPARK-24579 SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks? > Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition > -- > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Priority: Major > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to batch scope, user need to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. I created this JIRA to discuss possible solutions. > Essentially we need to support "start()" and "finish()" besides "apply". We > can either provide those interfaces or simply provide users the iterator of > batches in pd.DataFrame and let user code handle it. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743691#comment-16743691 ] Hyukjin Kwon commented on SPARK-26412: -- [~mengxr], looks this can be subset of SPARK-26413. Did I understand correctly? > Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition > -- > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Priority: Major > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to batch scope, user need to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. I created this JIRA to discuss possible solutions. > Essentially we need to support "start()" and "finish()" besides "apply". We > can either provide those interfaces or simply provide users the iterator of > batches in pd.DataFrame and let user code handle it. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition
[ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725957#comment-16725957 ] Li Jin commented on SPARK-26412: So this is similar to the mapPartitions API in Scala but instead of having an iterator of records, here we want an iterator of pd.DataFrames instead? > Allow Pandas UDF to take an iterator of pd.DataFrames for the entire partition > -- > > Key: SPARK-26412 > URL: https://issues.apache.org/jira/browse/SPARK-26412 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Priority: Major > > Pandas UDF is the ideal connection between PySpark and DL model inference > workload. However, user needs to load the model file first to make > predictions. It is common to see models of size ~100MB or bigger. If the > Pandas UDF execution is limited to batch scope, user need to repeatedly load > the same model for every batch in the same python worker process, which is > inefficient. I created this JIRA to discuss possible solutions. > Essentially we need to support "start()" and "finish()" besides "apply". We > can either provide those interfaces or simply provide users the iterator of > batches in pd.DataFrame and let user code handle it. > cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org