[ 
https://issues.apache.org/jira/browse/SPARK-33519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaetan updated SPARK-33519:
---------------------------
    Issue Type: New Feature  (was: Wish)

> Batch UDF in scala
> ------------------
>
>                 Key: SPARK-33519
>                 URL: https://issues.apache.org/jira/browse/SPARK-33519
>             Project: Spark
>          Issue Type: New Feature
>          Components: Optimizer, Spark Core
>    Affects Versions: 3.0.0, 3.0.1
>            Reporter: Gaetan
>            Priority: Major
>
> Hello,
> Contrary to Python, there is only one type of Scala UDF, that let us define a 
> Scala function to apply on a set of Column and which is called +for each 
> row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able 
> to see what are the inputs which are then used for column pruning, predicate 
> pushdown and other optimization rules. But in some use cases, there can be a 
> setup phase that we only want to execute once per worker right before 
> processing inputs. For such use cases, Scala UDF is not well suited and 
> mapPartitions is used instead like this:
>  
> {code:java}
> ds.mapPartitions(
>   it => {
>     setup()
>     process(it)
>   }
> ){code}
> After having looked at the code, I figured that Python UDF are implemented 
> via query plans that retrieve a RDD via their children and that call 
> mapPartitions of that RDD to work with batches of inputs. These query plans 
> are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs).
>  
> Like for Python UDFs, we could implement Scala batch UDFs with query plans to 
> work with a batch of inputs instead of one input. What do you think ?
> Here is a very small description of one of our use cases of Spark that could 
> greatly benefit from Scala batch UDFs:
> We are using Spark to distribute some computation run in C#. To do so, we 
> call the method mapPartitions of the DataFrame that represents our data. 
> Inside mapPartitions, we:
>  * First connect to the C# process
>  * Then iterate over the inputs by sending each input to the C# process and 
> by getting back the results.
> The use of mapPartitions was motivated by the setup (connection to the C# 
> process) that happens for each partition.
> Now that we have a first working version, we would like to improve it by 
> limiting the columns to read. We don't want to select columns that are 
> required by our computation right before the mapPartitions because it would 
> result in filtering out columns that could be required by other 
> transformations in the workflow. Instead, we would like to take advantage of 
> Catalyst for column pruning, predict pushdowns and other optimization rules. 
> Using a Scala UDF to replace the mapPartitions would not be efficient because 
> we would connect to the C# process for each row. An alternative would be a 
> Scala "batch" UDF which would be applied on the columns that are needed for 
> our computation, to take advantage of Catalyst and its optimizing rules, and 
> which input would be an iterator like mapPartitions. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to