raoraoxiong created FLINK-39986:
-----------------------------------

             Summary: Support Common Sub-Expression Elimination (CSE) for 
Python UDFs
                 Key: FLINK-39986
                 URL: https://issues.apache.org/jira/browse/FLINK-39986
             Project: Flink
          Issue Type: Improvement
          Components: API / Python, Table SQL / Planner
            Reporter: raoraoxiong


h2. Motivation

FLINK-39268 introduced CSE (Common Sub-Expression Elimination) for Java/Scala 
UDFs by leveraging Calcite's built-in RexProgram normalization. However, this 
optimization {*}does not apply to Python UDFs{*}.

The root cause is that PythonCalcSplitRule breaks the original RexProgram 
structure during the plan rewriting phase — it splits a single Calc into 
multiple Calcs to separate Python and Java function execution. After this 
split, the structural sharing maintained by RexProgram (via RexLocalRef 
deduplication) is lost, and identical Python UDF calls end up being computed 
independently.

Due to the cross-process nature of Python UDF execution (Java ↔ Python via 
gRPC), redundant calls are significantly more expensive than Java UDF 
duplicates:
 # Duplicated serialization/deserialization and cross-process communication 
overhead

 # Redundant Python UDF computation in the Python worker

For example:

 

{{SELECT python_udf(a, b) + 1, python_udf(a, b) + 2
FROM source_table
WHERE python_udf(a, b) > 0}}

Currently python_udf(a, b) is computed {*}3 times{*}. With CSE it should be 
computed only {*}once{*}.
----
h2. Proposed Approach
h3. Phase 1: Intra-operator Projection-level CSE

Deduplicate identical Python UDF calls within the same PythonCalc operator:
 * Introduce PythonCallDeduplicator to identify structurally identical calls in 
the projection

 * Send only unique calls to the Python worker, using refIndex to indicate 
result reuse

 * Add an expansion projection to map deduplicated results back to the original 
output schema

h3. Phase 2: Cross Condition/Projection CSE

When the same Python UDF appears in both WHERE and SELECT:
 * Modify RemoteCalcSplitConditionRule.split() to process both condition and 
projection through the ScalarFunctionSplitter (condition first, then projection)

 * Add deduplication logic to ScalarFunctionSplitter.getExtractedRexNode(): 
before extracting a new node, check if a structurally equal node was already 
extracted; if so, reuse its reference

----
h2. Scope / Limitations
||Scenario||Supported||
|Same UDF repeated in projection|✅ Phase 1|
|Same UDF in WHERE and SELECT|✅ Phase 2|
|Nested UDF with shared sub-expression (e.g. udf2(udf1(x)) and udf1(x))|✅ Phase 
1|
|Same UDF across different operators (e.g., Calc + LookupJoin)|❌ Out of scope|
|Python UDTF (Table Functions)|❌ Out of scope|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to