[ https://issues.apache.org/jira/browse/SPARK-33519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gaetan updated SPARK-33519: --------------------------- Issue Type: New Feature (was: Wish) > Batch UDF in scala > ------------------ > > Key: SPARK-33519 > URL: https://issues.apache.org/jira/browse/SPARK-33519 > Project: Spark > Issue Type: New Feature > Components: Optimizer, Spark Core > Affects Versions: 3.0.0, 3.0.1 > Reporter: Gaetan > Priority: Major > > Hello, > Contrary to Python, there is only one type of Scala UDF, that let us define a > Scala function to apply on a set of Column and which is called +for each > row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able > to see what are the inputs which are then used for column pruning, predicate > pushdown and other optimization rules. But in some use cases, there can be a > setup phase that we only want to execute once per worker right before > processing inputs. For such use cases, Scala UDF is not well suited and > mapPartitions is used instead like this: > > {code:java} > ds.mapPartitions( > it => { > setup() > process(it) > } > ){code} > After having looked at the code, I figured that Python UDF are implemented > via query plans that retrieve a RDD via their children and that call > mapPartitions of that RDD to work with batches of inputs. These query plans > are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs). > > Like for Python UDFs, we could implement Scala batch UDFs with query plans to > work with a batch of inputs instead of one input. What do you think ? > Here is a very small description of one of our use cases of Spark that could > greatly benefit from Scala batch UDFs: > We are using Spark to distribute some computation run in C#. To do so, we > call the method mapPartitions of the DataFrame that represents our data. > Inside mapPartitions, we: > * First connect to the C# process > * Then iterate over the inputs by sending each input to the C# process and > by getting back the results. > The use of mapPartitions was motivated by the setup (connection to the C# > process) that happens for each partition. > Now that we have a first working version, we would like to improve it by > limiting the columns to read. We don't want to select columns that are > required by our computation right before the mapPartitions because it would > result in filtering out columns that could be required by other > transformations in the workflow. Instead, we would like to take advantage of > Catalyst for column pruning, predict pushdowns and other optimization rules. > Using a Scala UDF to replace the mapPartitions would not be efficient because > we would connect to the C# process for each row. An alternative would be a > Scala "batch" UDF which would be applied on the columns that are needed for > our computation, to take advantage of Catalyst and its optimizing rules, and > which input would be an iterator like mapPartitions. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org