If the UDFs are computationally expensive, I wouldn't solve this problem with  
UDFs at all. If they are working in an iterative manner, and assuming each 
iteration is independent of other iterations (yes, I know that's a big 
assumptiuon), I would think about exploding your dataframe to have a row per 
iteration, and working on each row separately, and then aggregating in the end. 
This allows you to scale your computation much better. 

I know not all computations can be map-reducable like that. However, most can. 

Split and merge data workflows in Spark don't work like their DAG 
representations, unless you add costly caches. Without caching, each split will 
result in Spark rereading data from the source, even if the splits are getting 
merged together. The only way to avoid it is by caching at the split point, 
which depending on the amount of data can become costly. Also, joins result in 
shuffles. Avoiding splits and merges is better.

To give you an example, we had an application that applied a series of rules to 
rows. The output required was a dataframe with an additional column that 
indicated which rule the row satisfied. In our initial implementation, we had a 
series of r one per rule. For N rules, we created N dataframes that had the 
rows that satisfied the rules. The we unioned the N data frames. Horrible 
performance that didn't scale with N. We reimplemented to add N Boolean 
columns; one per rule; that indicated if the rule was satisfied. We just kept 
adding the boolen columns to the dataframe. After iterating over the rules, we 
added another column that indicated out which rule was satisfied, and then 
dropped the Boolean columns. Much better performance that scaled with N. Spark 
read from datasource just once, and since there were no joins/unions, there was 
no shuffle

On 5/17/21, 2:56 PM, "Andrew Melo" <andrew.m...@gmail.com> wrote:

    CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



    In our case, these UDFs are quite expensive and worked on in an
    iterative manner, so being able to cache the two "sides" of the graphs
    independently will speed up the development cycle. Otherwise, if you
    modify foo() here, then you have to recompute bar and baz, even though
    they're unchanged.

    df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', 
baz('x'))

    Additionally, a longer goal would be to be able to persist/cache these
    columns to disk so a downstream user could later mix and match several
    (10s) of these columns together as their inputs w/o having to
    explicitly compute them themselves.

    Cheers
    Andrew

    On Mon, May 17, 2021 at 1:10 PM Sean Owen <sro...@gmail.com> wrote:
    >
    > Why join here - just add two columns to the DataFrame directly?
    >
    > On Mon, May 17, 2021 at 1:04 PM Andrew Melo <andrew.m...@gmail.com> wrote:
    >>
    >> Anyone have ideas about the below Q?
    >>
    >> It seems to me that given that "diamond" DAG, that spark could see
    >> that the rows haven't been shuffled/filtered, it could do some type of
    >> "zip join" to push them together, but I've not been able to get a plan
    >> that doesn't do a hash/sort merge join
    >>
    >> Cheers
    >> Andrew
    >>

    ---------------------------------------------------------------------
    To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Reply via email to