[
https://issues.apache.org/jira/browse/FLINK-39986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
raoraoxiong updated FLINK-39986:
--------------------------------
Description:
h2. Motivation
FLINK-39268 introduced CSE (Common Sub-Expression Elimination) for Java/Scala
UDFs by leveraging Calcite's built-in RexProgram normalization. However, this
optimization {*}does not apply to Python UDFs{*}.
The root cause is that PythonCalcSplitRule breaks the original RexProgram
structure during the plan rewriting phase — it splits a single Calc into
multiple Calcs to separate Python and Java function execution. After this
split, the structural sharing maintained by RexProgram (via RexLocalRef
deduplication) is lost, and identical Python UDF calls end up being computed
independently.
Due to the cross-process nature of Python UDF execution (Java ↔ Python via
gRPC), redundant calls are significantly more expensive than Java UDF
duplicates:
# Duplicated serialization/deserialization and cross-process communication
overhead
# Redundant Python UDF computation in the Python worker
For example:
{code:java}
SELECT python_udf(a, b) + 1, python_udf(a, b) + 2
FROM source_table
WHERE python_udf(a, b) > 0 {code}
Currently python_udf(a, b) is computed {*}3 times{*}. With CSE it should be
computed only {*}once{*}.
----
h2. Proposed Approach
h3. Phase 1: Intra-operator Projection-level CSE
Deduplicate identical Python UDF calls within the same PythonCalc operator:
* Introduce PythonCallDeduplicator to identify structurally identical calls in
the projection
* Send only unique calls to the Python worker, using refIndex to indicate
result reuse
* Add an expansion projection to map deduplicated results back to the original
output schema
h3. Phase 2: Cross Condition/Projection CSE
When the same Python UDF appears in both WHERE and SELECT:
* Modify RemoteCalcSplitConditionRule.split() to process both condition and
projection through the ScalarFunctionSplitter (condition first, then projection)
* Add deduplication logic to ScalarFunctionSplitter.getExtractedRexNode():
before extracting a new node, check if a structurally equal node was already
extracted; if so, reuse its reference
----
h2. Scope / Limitations
||Scenario||Supported||
|Same UDF repeated in projection|✅ Phase 1|
|Same UDF in WHERE and SELECT|✅ Phase 2|
|Nested UDF with shared sub-expression (e.g. udf2(udf1(x)) and udf1(x))|✅ Phase
1|
|Same UDF across different operators (e.g., Calc + LookupJoin)|❌ Out of scope|
|Python UDTF (Table Functions)|❌ Out of scope|
was:
h2. Motivation
FLINK-39268 introduced CSE (Common Sub-Expression Elimination) for Java/Scala
UDFs by leveraging Calcite's built-in RexProgram normalization. However, this
optimization {*}does not apply to Python UDFs{*}.
The root cause is that PythonCalcSplitRule breaks the original RexProgram
structure during the plan rewriting phase — it splits a single Calc into
multiple Calcs to separate Python and Java function execution. After this
split, the structural sharing maintained by RexProgram (via RexLocalRef
deduplication) is lost, and identical Python UDF calls end up being computed
independently.
Due to the cross-process nature of Python UDF execution (Java ↔ Python via
gRPC), redundant calls are significantly more expensive than Java UDF
duplicates:
# Duplicated serialization/deserialization and cross-process communication
overhead
# Redundant Python UDF computation in the Python worker
For example:
{{SELECT python_udf(a, b) + 1, python_udf(a, b) + 2
FROM source_table
WHERE python_udf(a, b) > 0}}
Currently python_udf(a, b) is computed {*}3 times{*}. With CSE it should be
computed only {*}once{*}.
----
h2. Proposed Approach
h3. Phase 1: Intra-operator Projection-level CSE
Deduplicate identical Python UDF calls within the same PythonCalc operator:
* Introduce PythonCallDeduplicator to identify structurally identical calls in
the projection
* Send only unique calls to the Python worker, using refIndex to indicate
result reuse
* Add an expansion projection to map deduplicated results back to the original
output schema
h3. Phase 2: Cross Condition/Projection CSE
When the same Python UDF appears in both WHERE and SELECT:
* Modify RemoteCalcSplitConditionRule.split() to process both condition and
projection through the ScalarFunctionSplitter (condition first, then projection)
* Add deduplication logic to ScalarFunctionSplitter.getExtractedRexNode():
before extracting a new node, check if a structurally equal node was already
extracted; if so, reuse its reference
----
h2. Scope / Limitations
||Scenario||Supported||
|Same UDF repeated in projection|✅ Phase 1|
|Same UDF in WHERE and SELECT|✅ Phase 2|
|Nested UDF with shared sub-expression (e.g. udf2(udf1(x)) and udf1(x))|✅ Phase
1|
|Same UDF across different operators (e.g., Calc + LookupJoin)|❌ Out of scope|
|Python UDTF (Table Functions)|❌ Out of scope|
> Support Common Sub-Expression Elimination (CSE) for Python UDFs
> ---------------------------------------------------------------
>
> Key: FLINK-39986
> URL: https://issues.apache.org/jira/browse/FLINK-39986
> Project: Flink
> Issue Type: Improvement
> Components: API / Python, Table SQL / Planner
> Reporter: raoraoxiong
> Priority: Major
>
> h2. Motivation
> FLINK-39268 introduced CSE (Common Sub-Expression Elimination) for Java/Scala
> UDFs by leveraging Calcite's built-in RexProgram normalization. However, this
> optimization {*}does not apply to Python UDFs{*}.
> The root cause is that PythonCalcSplitRule breaks the original RexProgram
> structure during the plan rewriting phase — it splits a single Calc into
> multiple Calcs to separate Python and Java function execution. After this
> split, the structural sharing maintained by RexProgram (via RexLocalRef
> deduplication) is lost, and identical Python UDF calls end up being computed
> independently.
> Due to the cross-process nature of Python UDF execution (Java ↔ Python via
> gRPC), redundant calls are significantly more expensive than Java UDF
> duplicates:
> # Duplicated serialization/deserialization and cross-process communication
> overhead
> # Redundant Python UDF computation in the Python worker
> For example:
>
> {code:java}
> SELECT python_udf(a, b) + 1, python_udf(a, b) + 2
> FROM source_table
> WHERE python_udf(a, b) > 0 {code}
>
> Currently python_udf(a, b) is computed {*}3 times{*}. With CSE it should be
> computed only {*}once{*}.
> ----
> h2. Proposed Approach
> h3. Phase 1: Intra-operator Projection-level CSE
> Deduplicate identical Python UDF calls within the same PythonCalc operator:
> * Introduce PythonCallDeduplicator to identify structurally identical calls
> in the projection
> * Send only unique calls to the Python worker, using refIndex to indicate
> result reuse
> * Add an expansion projection to map deduplicated results back to the
> original output schema
> h3. Phase 2: Cross Condition/Projection CSE
> When the same Python UDF appears in both WHERE and SELECT:
> * Modify RemoteCalcSplitConditionRule.split() to process both condition and
> projection through the ScalarFunctionSplitter (condition first, then
> projection)
> * Add deduplication logic to ScalarFunctionSplitter.getExtractedRexNode():
> before extracting a new node, check if a structurally equal node was already
> extracted; if so, reuse its reference
> ----
> h2. Scope / Limitations
> ||Scenario||Supported||
> |Same UDF repeated in projection|✅ Phase 1|
> |Same UDF in WHERE and SELECT|✅ Phase 2|
> |Nested UDF with shared sub-expression (e.g. udf2(udf1(x)) and udf1(x))|✅
> Phase 1|
> |Same UDF across different operators (e.g., Calc + LookupJoin)|❌ Out of scope|
> |Python UDTF (Table Functions)|❌ Out of scope|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)