[
https://issues.apache.org/jira/browse/FLINK-39986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu reassigned FLINK-39986:
-------------------------------
Assignee: raoraoxiong
> Support Common Sub-Expression Elimination (CSE) for Python UDFs
> ---------------------------------------------------------------
>
> Key: FLINK-39986
> URL: https://issues.apache.org/jira/browse/FLINK-39986
> Project: Flink
> Issue Type: Improvement
> Components: API / Python, Table SQL / Planner
> Reporter: raoraoxiong
> Assignee: raoraoxiong
> Priority: Major
>
> h2. Motivation
> FLINK-39268 introduced CSE (Common Sub-Expression Elimination) for Java/Scala
> UDFs by leveraging Calcite's built-in RexProgram normalization. However, this
> optimization {*}does not apply to Python UDFs{*}.
> The root cause is that PythonCalcSplitRule breaks the original RexProgram
> structure during the plan rewriting phase — it splits a single Calc into
> multiple Calcs to separate Python and Java function execution. After this
> split, the structural sharing maintained by RexProgram (via RexLocalRef
> deduplication) is lost, and identical Python UDF calls end up being computed
> independently.
> Due to the cross-process nature of Python UDF execution (Java ↔ Python via
> gRPC), redundant calls are significantly more expensive than Java UDF
> duplicates:
> # Duplicated serialization/deserialization and cross-process communication
> overhead
> # Redundant Python UDF computation in the Python worker
> For example:
>
> {code:java}
> SELECT python_udf(a, b) + 1, python_udf(a, b) + 2
> FROM source_table
> WHERE python_udf(a, b) > 0 {code}
>
> Currently python_udf(a, b) is computed {*}3 times{*}. With CSE it should be
> computed only {*}once{*}.
> ----
> h2. Proposed Approach
> h3. Phase 1: Intra-operator Projection-level CSE
> Deduplicate identical Python UDF calls within the same PythonCalc operator:
> * Introduce PythonCallDeduplicator to identify structurally identical calls
> in the projection
> * Send only unique calls to the Python worker, using refIndex to indicate
> result reuse
> * Add an expansion projection to map deduplicated results back to the
> original output schema
> h3. Phase 2: Cross Condition/Projection CSE
> When the same Python UDF appears in both WHERE and SELECT:
> * Modify RemoteCalcSplitConditionRule.split() to process both condition and
> projection through the ScalarFunctionSplitter (condition first, then
> projection)
> * Add deduplication logic to ScalarFunctionSplitter.getExtractedRexNode():
> before extracting a new node, check if a structurally equal node was already
> extracted; if so, reuse its reference
> ----
> h2. Scope / Limitations
> ||Scenario||Supported||
> |Same UDF repeated in projection|✅ Phase 1|
> |Same UDF in WHERE and SELECT|✅ Phase 2|
> |Nested UDF with shared sub-expression (e.g. udf2(udf1(x)) and udf1(x))|✅
> Phase 1|
> |Same UDF across different operators (e.g., Calc + LookupJoin)|❌ Out of scope|
> |Python UDTF (Table Functions)|❌ Out of scope|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)