[ 
https://issues.apache.org/jira/browse/FLINK-39986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-39986:
-------------------------------

    Assignee: raoraoxiong

> Support Common Sub-Expression Elimination (CSE) for Python UDFs
> ---------------------------------------------------------------
>
>                 Key: FLINK-39986
>                 URL: https://issues.apache.org/jira/browse/FLINK-39986
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python, Table SQL / Planner
>            Reporter: raoraoxiong
>            Assignee: raoraoxiong
>            Priority: Major
>
> h2. Motivation
> FLINK-39268 introduced CSE (Common Sub-Expression Elimination) for Java/Scala 
> UDFs by leveraging Calcite's built-in RexProgram normalization. However, this 
> optimization {*}does not apply to Python UDFs{*}.
> The root cause is that PythonCalcSplitRule breaks the original RexProgram 
> structure during the plan rewriting phase — it splits a single Calc into 
> multiple Calcs to separate Python and Java function execution. After this 
> split, the structural sharing maintained by RexProgram (via RexLocalRef 
> deduplication) is lost, and identical Python UDF calls end up being computed 
> independently.
> Due to the cross-process nature of Python UDF execution (Java ↔ Python via 
> gRPC), redundant calls are significantly more expensive than Java UDF 
> duplicates:
>  # Duplicated serialization/deserialization and cross-process communication 
> overhead
>  # Redundant Python UDF computation in the Python worker
> For example:
>  
> {code:java}
> SELECT python_udf(a, b) + 1, python_udf(a, b) + 2
> FROM source_table
> WHERE python_udf(a, b) > 0 {code}
>  
> Currently python_udf(a, b) is computed {*}3 times{*}. With CSE it should be 
> computed only {*}once{*}.
> ----
> h2. Proposed Approach
> h3. Phase 1: Intra-operator Projection-level CSE
> Deduplicate identical Python UDF calls within the same PythonCalc operator:
>  * Introduce PythonCallDeduplicator to identify structurally identical calls 
> in the projection
>  * Send only unique calls to the Python worker, using refIndex to indicate 
> result reuse
>  * Add an expansion projection to map deduplicated results back to the 
> original output schema
> h3. Phase 2: Cross Condition/Projection CSE
> When the same Python UDF appears in both WHERE and SELECT:
>  * Modify RemoteCalcSplitConditionRule.split() to process both condition and 
> projection through the ScalarFunctionSplitter (condition first, then 
> projection)
>  * Add deduplication logic to ScalarFunctionSplitter.getExtractedRexNode(): 
> before extracting a new node, check if a structurally equal node was already 
> extracted; if so, reuse its reference
> ----
> h2. Scope / Limitations
> ||Scenario||Supported||
> |Same UDF repeated in projection|✅ Phase 1|
> |Same UDF in WHERE and SELECT|✅ Phase 2|
> |Nested UDF with shared sub-expression (e.g. udf2(udf1(x)) and udf1(x))|✅ 
> Phase 1|
> |Same UDF across different operators (e.g., Calc + LookupJoin)|❌ Out of scope|
> |Python UDTF (Table Functions)|❌ Out of scope|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to