Gaetan created SPARK-33519:
------------------------------

             Summary: Batch UDF in scala
                 Key: SPARK-33519
                 URL: https://issues.apache.org/jira/browse/SPARK-33519
             Project: Spark
          Issue Type: Wish
          Components: Optimizer, Spark Core
    Affects Versions: 3.0.1, 3.0.0
            Reporter: Gaetan


Hello,

Contrary to Python, there is only one type of Scala UDF, that let us define a 
Scala function to apply on a set of Column and which is called +for each row+. 
One advantage of Scala UDF over mapPartitions is that Catalyst is able to see 
what are the inputs which are then used for column pruning, predicate pushdown 
and other optimization rules. But in some use cases, there can be a setup phase 
that we only want to execute once per worker right before processing inputs. 
For such use cases, Scala UDF is not well suited and mapPartitions is used 
instead like this:

 
{code:java}
ds.mapPartitions(
  it => {
    setup()
    process(it)
  }
){code}
After having looked at the code, I figured that Python UDF are implemented via 
query plans that retrieve a RDD via their children and that call mapPartitions 
of that RDD to work with batches of inputs. These query plans are generated by 
Catalyst by extracting Python UDFs (rule ExtractPythonUDFs).

 

Like for Python UDFs, we could implement Scala batch UDFs with query plans to 
work with a batch of inputs instead of one input. What do you think ?

Here is a very small description of one of our use cases of Spark that could 
greatly benefit from Scala batch UDFs:

We are using Spark to distribute some computation run in C#. To do so, we call 
the method mapPartitions of the DataFrame that represents our data. Inside 
mapPartitions, we:
 * First connect to the C# process
 * Then iterate over the inputs by sending each input to the C# process and by 
getting back the results.

The use of mapPartitions was motivated by the setup (connection to the C# 
process) that happens for each partition.

Now that we have a first working version, we would like to improve it by 
limiting the columns to read. We don't want to select columns that are required 
by our computation right before the mapPartitions because it would result in 
filtering out columns that could be required by other transformations in the 
workflow. Instead, we would like to take advantage of Catalyst for column 
pruning, predict pushdowns and other optimization rules. Using a Scala UDF to 
replace the mapPartitions would not be efficient because we would connect to 
the C# process for each row. An alternative would be a Scala "batch" UDF which 
would be applied on the columns that are needed for our computation, to take 
advantage of Catalyst and its optimizing rules, and which input would be an 
iterator like mapPartitions. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to