[ 
https://issues.apache.org/jira/browse/SPARK-40262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601021#comment-17601021
 ] 

Shardul Mahadik commented on SPARK-40262:
-----------------------------------------

[~cloud_fan] [~viirya] [~joshrosen] Gentle ping on this!

> Expensive UDF evaluation pushed down past a join leads to performance issues 
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-40262
>                 URL: https://issues.apache.org/jira/browse/SPARK-40262
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Shardul Mahadik
>            Priority: Major
>
> Consider a Spark job with an expensive UDF which looks like follows:
> {code:scala}
> val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i))
> spark.range(10).write.format("orc").save("/tmp/orc")
> val df = spark.read.format("orc").load("/tmp/orc").as("a")
>     .join(spark.range(10).as("b"), "id")
>     .withColumn("udf_op", expensive_udf($"a.id"))
>     .join(spark.range(10).as("c"), $"udf_op" === $"c.id")
> {code}
> This creates a physical plan as follows:
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, 
> BuildRight, false
>    :- Project [id#330L, if (isnull(cast(id#330L as int))) null else 
> expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338]
>    :  +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false
>    :     :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) 
> AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int)))))
>    :     :  +- FileScan orc [id#330L] Batched: true, DataFilters: 
> [isnotnull(id#330L), isnotnull(cast(id#330L as int)), 
> isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: 
> InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], 
> PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
>    :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> bigint, false]),false), [plan_id=416]
>    :        +- Range (0, 10, step=1, splits=16)
>    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]),false), [plan_id=420]
>       +- Range (0, 10, step=1, splits=16)
> {code}
> In this case, the expensive UDF call is duplicated thrice. Since the UDF 
> output is used in a future join, `InferFiltersFromConstraints` adds an `IS 
> NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF 
> call and push the UDF past a previous join. The duplication behaviour [is 
> documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196]
>  and in itself is not a huge issue. But given a highly restrictive join, the 
> UDF gets evaluated on many orders of magnitude more rows than it should have 
> slowing down the job.
> Can we avoid this duplication of UDF calls? In SPARK-37392, we made a 
> [similar change|https://github.com/apache/spark/pull/34823/files] where we 
> decided to only add inferred filters if the input is an attribute. Should we 
> use a similar strategy for `InferFiltersFromConstraints`?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to