[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16735492#comment-16735492
 ] 

Ohad Raviv commented on SPARK-18748:
------------------------------------

We're encountering this same problem once again with Spark structrued streaming.
the typical way to read and parse is something like:

{code:java}
 spark.read
        .format("kafka")
        .option("kafka.bootstrap.servers", brokerAddress)
        .option("subscribe", topic)
        .load()
        .select(parsingUDF(col("value")).as("parsed_struct"))
        .selectExpr("parsed_struct.*")
{code}

and the ".*" expansion causes the udf to run as many times as the number of 
columns in the struct. we typicallly have dosens of columns meaning dosens of 
parses per incoming message.
here we can't use any of the bypass solutions mentioned above and in 
SPARK-17728 as ".cache" and ".rdd" are unusupported operations on structured 
streaming dataframe.
[~cloud_fan],[~hvanhovell] - maybe you have an idea for a workaround also in 
the case of streaming?



> UDF multiple evaluations causes very poor performance
> -----------------------------------------------------
>
>                 Key: SPARK-18748
>                 URL: https://issues.apache.org/jira/browse/SPARK-18748
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Ohad Raviv
>            Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +-------+
> |      c|
> +-------+
> |nothing|
> +-------+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to