nateab opened a new pull request, #27506:
URL: https://github.com/apache/flink/pull/27506
…iple times
When a non-deterministic UDF (e.g., RAND(), UUID()) appears in both the
WHERE clause and SELECT clause of a query, the UDF was being evaluated multiple
times per row - once for the filter condition and again for the projection.
This caused incorrect behavior where a row that passes the filter condition
could have a different UDF value in the output.
This fix adds caching for common non-deterministic subexpressions between
filter and projection:
1. Added reusableSubExprCache to CodeGeneratorContext to cache pre-generated
expressions by their RexNode string representation.
2. Modified CalcCodeGenerator.generateProcessCode() to identify
non-deterministic expressions that appear in both condition and projection, and
pre-generate them BEFORE the filter condition is generated (critical for the
cache to be populated).
3. Modified ExprCodeGenerator.generateExpression() to check the cache first
and return the cached result (with NO_CODE) to avoid duplicate evaluation. Also
modified visitCall to use generateExpression for operand processing to ensure
cache lookups happen for subexpressions.
4. Added testNonDeterministicUdfInWhereAndSelect test to verify the fix.
## What is the purpose of the change
This pull request fixes FLINK-29855 where non-deterministic UDFs (e.g.,
`RAND()`, `UUID()`) appearing in both the WHERE clause and SELECT clause of a
query were being evaluated multiple times per row. This caused
incorrect behavior where a row that passes the filter condition could have
a different UDF value in the output, because the UDF was called once for
filtering and again for projection.
For example, with the query:
```sql
SELECT uuid() FROM t WHERE uuid() IS NOT NULL
The UUID in the output would be different from the one used in the filter
check, which is semantically incorrect.
```
## Brief change log
- Added reusableSubExprCache to CodeGeneratorContext to cache pre-generated
expressions by their RexNode string representation
- Added preGenerateCommonNonDeterministicExprs() method in CalcCodeGenerator
to identify non-deterministic expressions that appear in both condition and
projection, and pre-generate them before processing
- Modified CalcCodeGenerator.generateProcessCode() to call the
pre-generation method BEFORE generating the filter condition (critical for
cache population)
- Modified ExprCodeGenerator.generateExpression() to check the cache first
and return cached results with NO_CODE to avoid duplicate evaluation
- Modified ExprCodeGenerator.visitCall() to use generateExpression() for
operand processing to ensure cache lookups happen for subexpressions
## Verifying this change
This change added tests and can be verified as follows:
- Added CountingNonDeterministicUdf - a non-deterministic UDF that returns
an incrementing counter, used to detect if the UDF is called multiple times per
row
- Added testNonDeterministicUdfInWhereAndSelect test in CalcITCase that:
- Runs a query with a non-deterministic UDF in both WHERE and SELECT
clauses
- Verifies the counter increments by 1 per row (not 2), confirming the UDF
is only evaluated once
- Without the fix: counter values are 1, 3, 5 (called twice per row) -
test fails
- With the fix: counter values are 1, 2, 3 (called once per row) - test
passes
- Verified existing CalcTest (95 tests) and NonDeterministicTest (14 tests)
all pass
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes - this
change affects code generation for Calc operators with both filter and
projection. The change adds caching which should improve performance
by avoiding duplicate UDF evaluations.
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]