raoraoxiong created FLINK-39986:
-----------------------------------
Summary: Support Common Sub-Expression Elimination (CSE) for
Python UDFs
Key: FLINK-39986
URL: https://issues.apache.org/jira/browse/FLINK-39986
Project: Flink
Issue Type: Improvement
Components: API / Python, Table SQL / Planner
Reporter: raoraoxiong
h2. Motivation
FLINK-39268 introduced CSE (Common Sub-Expression Elimination) for Java/Scala
UDFs by leveraging Calcite's built-in RexProgram normalization. However, this
optimization {*}does not apply to Python UDFs{*}.
The root cause is that PythonCalcSplitRule breaks the original RexProgram
structure during the plan rewriting phase — it splits a single Calc into
multiple Calcs to separate Python and Java function execution. After this
split, the structural sharing maintained by RexProgram (via RexLocalRef
deduplication) is lost, and identical Python UDF calls end up being computed
independently.
Due to the cross-process nature of Python UDF execution (Java ↔ Python via
gRPC), redundant calls are significantly more expensive than Java UDF
duplicates:
# Duplicated serialization/deserialization and cross-process communication
overhead
# Redundant Python UDF computation in the Python worker
For example:
{{SELECT python_udf(a, b) + 1, python_udf(a, b) + 2
FROM source_table
WHERE python_udf(a, b) > 0}}
Currently python_udf(a, b) is computed {*}3 times{*}. With CSE it should be
computed only {*}once{*}.
----
h2. Proposed Approach
h3. Phase 1: Intra-operator Projection-level CSE
Deduplicate identical Python UDF calls within the same PythonCalc operator:
* Introduce PythonCallDeduplicator to identify structurally identical calls in
the projection
* Send only unique calls to the Python worker, using refIndex to indicate
result reuse
* Add an expansion projection to map deduplicated results back to the original
output schema
h3. Phase 2: Cross Condition/Projection CSE
When the same Python UDF appears in both WHERE and SELECT:
* Modify RemoteCalcSplitConditionRule.split() to process both condition and
projection through the ScalarFunctionSplitter (condition first, then projection)
* Add deduplication logic to ScalarFunctionSplitter.getExtractedRexNode():
before extracting a new node, check if a structurally equal node was already
extracted; if so, reuse its reference
----
h2. Scope / Limitations
||Scenario||Supported||
|Same UDF repeated in projection|✅ Phase 1|
|Same UDF in WHERE and SELECT|✅ Phase 2|
|Nested UDF with shared sub-expression (e.g. udf2(udf1(x)) and udf1(x))|✅ Phase
1|
|Same UDF across different operators (e.g., Calc + LookupJoin)|❌ Out of scope|
|Python UDTF (Table Functions)|❌ Out of scope|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)