[
https://issues.apache.org/jira/browse/SPARK-53842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18029184#comment-18029184
]
Julian Klein edited comment on SPARK-53842 at 10/11/25 9:27 AM:
----------------------------------------------------------------
Hi [~lioritzhak] ,
I would _love_ to work on this, because I am super excited about query
optimization and want to get my feet wet working on Spark. Since this would be
my first issue for Spark, I might also have questions during the process,
though.
I do have another thing to work on first, AND I'll have to setup my dev
eivironment for Spark first (which might take some time).
If that does not sound too off-putting, feel free to assign this to me. I'd be
super excited to do this!
Best,
Julian
was (Author: JIRAUSER311102):
Hi [~lioritzhak] ,
I would _love_ to work on this, because I am super excited about query
optimization and want to get my feet wet working on Spark. Since this would be
my first issue for Spark, I might also have questions.
I do have another thing to work on first, AND I'll have to setup my dev
eivironment for Spark first (which might take some time).
If that does not sound too off-putting, feel free to assign this to me. I'd be
super excited to do this!
Best,
Julian
> Enable Filter Push-Down for Pandas UDFs with an Immutable Column Hint
> ----------------------------------------------------------------------
>
> Key: SPARK-53842
> URL: https://issues.apache.org/jira/browse/SPARK-53842
> Project: Spark
> Issue Type: Improvement
> Components: Optimizer, PySpark
> Affects Versions: 3.5.3, 4.0.1
> Reporter: Lior Itzhak
> Priority: Major
>
> h3. Problem Description
> Pandas UDFs ({{{}mapInPandas{}}}, {{{}applyInPandas{}}}, etc.) are powerful
> for custom data processing in PySpark. However, they currently act as a black
> box to the Catalyst Optimizer. This prevents the optimizer from pushing down
> filters on columns that pass through the UDF unmodified. As a result,
> filtering operations occur _after_ the expensive UDF execution and associated
> data shuffling, leading to significant performance degradation.
> This is especially common in pipelines where transformations are applied to
> grouped data, and the grouping key itself is not modified within the UDF.
> *Example:*
> Consider the following DataFrame and Pandas UDFs:
> {code:java}
> import pandas as pd
> from typing import Iterator
> df = spark.createDataFrame(
> [["A", 1], ["A", 1], ["B", 2]],
> schema=["id string", "value int"]
> )
> # UDF to modify the 'value' column
> def map_udf(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
> for pdf in pdfs:
> pdf["value"] = pdf["value"] + 1
> yield pdf
> # UDF to aggregate data by 'id'
> def agg_udf(pdf: pd.DataFrame) -> pd.DataFrame:
> return pdf.groupby("id").agg(count=("value", "count"))
> # Apply the UDFs
> modified_df = (
> df
> .mapInPandas(map_udf, schema="id string,value int")
> .groupby("id")
> .applyInPandas(agg_udf, schema="id string,count int")
> )
> # Filter the result
> modified_df.where("id == 'A'").explain() {code}
> In this example, the {{id}} column is never modified by either UDF. However,
> the filter on {{id}} is applied only after all transformations are complete.
> *Current Physical Plan:*
> The physical plan shows the {{Filter}} operation at the very top, processing
> data that has already been scanned, shuffled, and processed by both Pandas
> UDFs.
>
> {{}}
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Filter (isnotnull(id#20) AND (id#20 = A))
> +- FlatMapGroupsInPandas [id#13], agg_udf(id#13, value#14)#19, [id#20,
> count#21]
> +- Sort [id#13 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id#13, 200), ENSURE_REQUIREMENTS,
> [plan_id=20]
> +- Project [id#13, id#13, value#14]
> +- MapInPandas map_udf(id string#8, value int#9L)#12, [id#13,
> value#14], false
> +- Scan ExistingRDD[id string#8,value int#9L]{code}
> {{ }}
> This plan processes all data for both {{id = 'A'}} and {{id = 'B'}} through
> the entire pipeline, even though the data for {{'B'}} is discarded at the end.
> h3. Proposed Solution
> We propose introducing a mechanism to *hint* to the Catalyst Optimizer that
> specific columns within a Pandas UDF are immutable or pass through without
> modification. This would allow the optimizer to safely push down filters on
> these columns.
> This could be implemented as a new parameter in the UDF registration, for
> example, {{{}passthrough_cols{}}}:
>
> {{}}
> {code:java}
> # Proposed API modification
> modified_df = (
> df
> .mapInPandas(
> map_udf,
> schema="id string,value int",
> passthrough_cols=["id"] # New hint parameter
> )
> .groupby("id")
> .applyInPandas(
> agg_udf,
> schema="id string,count int",
> passthrough_cols=["id"] # New hint parameter
> )
> )
> {code}
> {{ }}
> With this hint, the optimizer could transform the physical plan to apply the
> filter at the data source, _before_ any expensive operations.
> *Desired Physical Plan:*
>
> {{}}
> {code:java}
> == Desired Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- FlatMapGroupsInPandas [id#13], agg_udf(id#13, value#14)#19, [id#20,
> count#21]
> +- Sort [id#13 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id#13, 200), ENSURE_REQUIREMENTS,
> [plan_id=20]
> +- Project [id#13, id#13, value#14]
> +- MapInPandas map_udf(id string#8, value int#9L)#12, [id#13,
> value#14], false
> +- Filter (isnotnull(id#8) AND (id#8 = A)) // <-- FILTER
> PUSHED DOWN
> +- Scan ExistingRDD[id string#8,value int#9L]]{code}
> {{ }}
> This optimized plan would significantly reduce the amount of data sent to the
> UDFs and shuffled across the network, resulting in major performance
> improvements.
> h3. Motivation & Justification
> # *Performance:* In large-scale data processing pipelines, filtering data
> early is one of the most effective optimization strategies. Enabling filter
> push-down for Pandas UDFs would unlock substantial performance gains,
> reducing I/O, network traffic, and computational load.
> # *Common Use Case:* Developers often know with certainty that grouping keys
> or other identifier columns are not modified within their UDFs. The proposed
> hint provides a direct means of communicating this domain knowledge to the
> optimizer.
> # *Usability:* This feature would empower developers to optimize their
> pipelines in scenarios where they cannot change an incoming plan and can only
> apply transformations to a given DataFrame.
> h3. Optional: Runtime Validation
> To safeguard against incorrect usage of the hint, Spark could optionally
> perform a runtime validation. This check would verify that the values in the
> columns marked as {{passthrough_cols}} are indeed unchanged between the input
> and output of the UDF. If a discrepancy is found (e.g., a value in the output
> {{id}} column did not exist in the input {{id}} column for that batch), Spark
> could raise an exception. While not entirely foolproof, this would cover most
> grouping and mapping use cases and prevent subtle bugs.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]