Shardul Mahadik created SPARK-40262:
---------------------------------------

             Summary: Expensive UDF evaluation pushed down past a join leads to 
performance issues 
                 Key: SPARK-40262
                 URL: https://issues.apache.org/jira/browse/SPARK-40262
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.4.0
            Reporter: Shardul Mahadik


Consider a Spark job with an expensive UDF which looks like follows:
{code:scala}
val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i))

spark.range(10).write.format("orc").save("/tmp/orc")

val df = spark.read.format("orc").load("/tmp/orc").as("a")
    .join(spark.range(10).as("b"), "id")
    .withColumn("udf_op", expensive_udf($"a.id"))
    .join(spark.range(10).as("c"), $"udf_op" === $"c.id")
{code}
This creates a physical plan as follows:
{code:java}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, 
BuildRight, false
   :- Project [id#330L, if (isnull(cast(id#330L as int))) null else 
expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338]
   :  +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false
   :     :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) 
AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int)))))
   :     :  +- FileScan orc [id#330L] Batched: true, DataFilters: 
[isnotnull(id#330L), isnotnull(cast(id#330L as int)), 
isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: 
InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: struct<id:bigint>
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
false]),false), [plan_id=416]
   :        +- Range (0, 10, step=1, splits=16)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
false]),false), [plan_id=420]
      +- Range (0, 10, step=1, splits=16)
{code}
In this case, the expensive UDF call is duplicated thrice. Since the UDF output 
is used in a future join, `InferFiltersFromConstraints` adds an `IS NOT NULL` 
filter on the UDF output. But the pushdown rules duplicate this UDF call and 
push the UDF past a previous join. The duplication behaviour [is 
documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196]
 and in itself is not a huge issue. But given a highly restrictive join, the 
UDF gets evaluated on many orders of magnitude more rows than it should have 
slowing down the job.

Can we avoid this duplication of UDF calls? In SPARK-37392, we made a [similar 
change|https://github.com/apache/spark/pull/34823/files] where we decided to 
only add inferred filters if the input is an attribute. Should we use a similar 
strategy for `InferFiltersFromConstraints`?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to