nateab opened a new pull request, #27506:
URL: https://github.com/apache/flink/pull/27506

   …iple times
   
   When a non-deterministic UDF (e.g., RAND(), UUID()) appears in both the 
WHERE clause and SELECT clause of a query, the UDF was being evaluated multiple 
times per row - once for the filter condition and again for the projection. 
This caused incorrect behavior where a row that passes the filter condition 
could have a different UDF value in the output.
   
   This fix adds caching for common non-deterministic subexpressions between 
filter and projection:
   
   1. Added reusableSubExprCache to CodeGeneratorContext to cache pre-generated 
expressions by their RexNode string representation.
   
   2. Modified CalcCodeGenerator.generateProcessCode() to identify 
non-deterministic expressions that appear in both condition and projection, and 
pre-generate them BEFORE the filter condition is generated (critical for the 
cache to be populated).
   
   3. Modified ExprCodeGenerator.generateExpression() to check the cache first 
and return the cached result (with NO_CODE) to avoid duplicate evaluation. Also 
modified visitCall to use generateExpression for operand processing to ensure 
cache lookups happen for subexpressions.
   
   4. Added testNonDeterministicUdfInWhereAndSelect test to verify the fix.
   
   ## What is the purpose of the change
   
   This pull request fixes FLINK-29855 where non-deterministic UDFs (e.g., 
`RAND()`, `UUID()`) appearing in both the WHERE clause and SELECT clause of a 
query were being evaluated multiple times per row. This caused
    incorrect behavior where a row that passes the filter condition could have 
a different UDF value in the output, because the UDF was called once for 
filtering and again for projection.
   
   For example, with the query:
   ```sql
   SELECT uuid() FROM t WHERE uuid() IS NOT NULL
   The UUID in the output would be different from the one used in the filter 
check, which is semantically incorrect.
   ```
   ## Brief change log
   
   - Added reusableSubExprCache to CodeGeneratorContext to cache pre-generated 
expressions by their RexNode string representation
   - Added preGenerateCommonNonDeterministicExprs() method in CalcCodeGenerator 
to identify non-deterministic expressions that appear in both condition and 
projection, and pre-generate them before processing
   - Modified CalcCodeGenerator.generateProcessCode() to call the 
pre-generation method BEFORE generating the filter condition (critical for 
cache population)
   - Modified ExprCodeGenerator.generateExpression() to check the cache first 
and return cached results with NO_CODE to avoid duplicate evaluation
   - Modified ExprCodeGenerator.visitCall() to use generateExpression() for 
operand processing to ensure cache lookups happen for subexpressions
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added CountingNonDeterministicUdf - a non-deterministic UDF that returns 
an incrementing counter, used to detect if the UDF is called multiple times per 
row
   - Added testNonDeterministicUdfInWhereAndSelect test in CalcITCase that:
     - Runs a query with a non-deterministic UDF in both WHERE and SELECT 
clauses
     - Verifies the counter increments by 1 per row (not 2), confirming the UDF 
is only evaluated once
     - Without the fix: counter values are 1, 3, 5 (called twice per row) - 
test fails
     - With the fix: counter values are 1, 2, 3 (called once per row) - test 
passes
   - Verified existing CalcTest (95 tests) and NonDeterministicTest (14 tests) 
all pass
   
   ## Does this pull request potentially affect one of the following parts:
   
   - Dependencies (does it add or upgrade a dependency): no
   - The public API, i.e., is any changed class annotated with 
@Public(Evolving): no
   - The serializers: no
   - The runtime per-record code paths (performance sensitive): yes - this 
change affects code generation for Calc operators with both filter and 
projection. The change adds caching which should improve performance
   by avoiding duplicate UDF evaluations.
   - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
   - The S3 file system connector: no
   
   ## Documentation
   
   - Does this pull request introduce a new feature? no
   - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to