[jira] [Commented] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues
[ https://issues.apache.org/jira/browse/SPARK-40262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601021#comment-17601021 ] Shardul Mahadik commented on SPARK-40262: - [~cloud_fan] [~viirya] [~joshrosen] Gentle ping on this! > Expensive UDF evaluation pushed down past a join leads to performance issues > - > > Key: SPARK-40262 > URL: https://issues.apache.org/jira/browse/SPARK-40262 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Shardul Mahadik >Priority: Major > > Consider a Spark job with an expensive UDF which looks like follows: > {code:scala} > val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i)) > spark.range(10).write.format("orc").save("/tmp/orc") > val df = spark.read.format("orc").load("/tmp/orc").as("a") > .join(spark.range(10).as("b"), "id") > .withColumn("udf_op", expensive_udf($"a.id")) > .join(spark.range(10).as("c"), $"udf_op" === $"c.id") > {code} > This creates a physical plan as follows: > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, > BuildRight, false >:- Project [id#330L, if (isnull(cast(id#330L as int))) null else > expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338] >: +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false >: :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) > AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int) >: : +- FileScan orc [id#330L] Batched: true, DataFilters: > [isnotnull(id#330L), isnotnull(cast(id#330L as int)), > isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: > InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], > PushedFilters: [IsNotNull(id)], ReadSchema: struct >: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > bigint, false]),false), [plan_id=416] >:+- Range (0, 10, step=1, splits=16) >+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false]),false), [plan_id=420] > +- Range (0, 10, step=1, splits=16) > {code} > In this case, the expensive UDF call is duplicated thrice. Since the UDF > output is used in a future join, `InferFiltersFromConstraints` adds an `IS > NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF > call and push the UDF past a previous join. The duplication behaviour [is > documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196] > and in itself is not a huge issue. But given a highly restrictive join, the > UDF gets evaluated on many orders of magnitude more rows than it should have > slowing down the job. > Can we avoid this duplication of UDF calls? In SPARK-37392, we made a > [similar change|https://github.com/apache/spark/pull/34823/files] where we > decided to only add inferred filters if the input is an attribute. Should we > use a similar strategy for `InferFiltersFromConstraints`? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues
[ https://issues.apache.org/jira/browse/SPARK-40262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598655#comment-17598655 ] Erik Krogen commented on SPARK-40262: - Good find and thanks for sharing the investigation [~shardulm]! I agree that a similar strategy as SPARK-37392 makes sense here. cc also [~viirya] and [~joshrosen] who were involved in SPARK-37392. > Expensive UDF evaluation pushed down past a join leads to performance issues > - > > Key: SPARK-40262 > URL: https://issues.apache.org/jira/browse/SPARK-40262 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Shardul Mahadik >Priority: Major > > Consider a Spark job with an expensive UDF which looks like follows: > {code:scala} > val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i)) > spark.range(10).write.format("orc").save("/tmp/orc") > val df = spark.read.format("orc").load("/tmp/orc").as("a") > .join(spark.range(10).as("b"), "id") > .withColumn("udf_op", expensive_udf($"a.id")) > .join(spark.range(10).as("c"), $"udf_op" === $"c.id") > {code} > This creates a physical plan as follows: > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, > BuildRight, false >:- Project [id#330L, if (isnull(cast(id#330L as int))) null else > expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338] >: +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false >: :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) > AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int) >: : +- FileScan orc [id#330L] Batched: true, DataFilters: > [isnotnull(id#330L), isnotnull(cast(id#330L as int)), > isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: > InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], > PushedFilters: [IsNotNull(id)], ReadSchema: struct >: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > bigint, false]),false), [plan_id=416] >:+- Range (0, 10, step=1, splits=16) >+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false]),false), [plan_id=420] > +- Range (0, 10, step=1, splits=16) > {code} > In this case, the expensive UDF call is duplicated thrice. Since the UDF > output is used in a future join, `InferFiltersFromConstraints` adds an `IS > NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF > call and push the UDF past a previous join. The duplication behaviour [is > documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196] > and in itself is not a huge issue. But given a highly restrictive join, the > UDF gets evaluated on many orders of magnitude more rows than it should have > slowing down the job. > Can we avoid this duplication of UDF calls? In SPARK-37392, we made a > [similar change|https://github.com/apache/spark/pull/34823/files] where we > decided to only add inferred filters if the input is an attribute. Should we > use a similar strategy for `InferFiltersFromConstraints`? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues
[ https://issues.apache.org/jira/browse/SPARK-40262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17597436#comment-17597436 ] Shardul Mahadik commented on SPARK-40262: - cc: [~cloud_fan] [~xkrogen] [~mridulm80] > Expensive UDF evaluation pushed down past a join leads to performance issues > - > > Key: SPARK-40262 > URL: https://issues.apache.org/jira/browse/SPARK-40262 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Shardul Mahadik >Priority: Major > > Consider a Spark job with an expensive UDF which looks like follows: > {code:scala} > val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i)) > spark.range(10).write.format("orc").save("/tmp/orc") > val df = spark.read.format("orc").load("/tmp/orc").as("a") > .join(spark.range(10).as("b"), "id") > .withColumn("udf_op", expensive_udf($"a.id")) > .join(spark.range(10).as("c"), $"udf_op" === $"c.id") > {code} > This creates a physical plan as follows: > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, > BuildRight, false >:- Project [id#330L, if (isnull(cast(id#330L as int))) null else > expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338] >: +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false >: :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) > AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int) >: : +- FileScan orc [id#330L] Batched: true, DataFilters: > [isnotnull(id#330L), isnotnull(cast(id#330L as int)), > isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: > InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], > PushedFilters: [IsNotNull(id)], ReadSchema: struct >: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > bigint, false]),false), [plan_id=416] >:+- Range (0, 10, step=1, splits=16) >+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false]),false), [plan_id=420] > +- Range (0, 10, step=1, splits=16) > {code} > In this case, the expensive UDF call is duplicated thrice. Since the UDF > output is used in a future join, `InferFiltersFromConstraints` adds an `IS > NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF > call and push the UDF past a previous join. The duplication behaviour [is > documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196] > and in itself is not a huge issue. But given a highly restrictive join, the > UDF gets evaluated on many orders of magnitude more rows than it should have > slowing down the job. > Can we avoid this duplication of UDF calls? In SPARK-37392, we made a > [similar change|https://github.com/apache/spark/pull/34823/files] where we > decided to only add inferred filters if the input is an attribute. Should we > use a similar strategy for `InferFiltersFromConstraints`? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org