raminqaf opened a new pull request, #26942:
URL: https://github.com/apache/flink/pull/26942
## What is the purpose of the change
This pull request fixes non-deterministic behavior in the MAP function when
duplicate keys are provided. The MAP function was producing inconsistent
results across different environments and test runs, causing CI failures and
breaking reproducibility guarantees.
The root cause was in the code generation logic in
`ScalarOperatorGens.scala`, where `groupBy` was used to deduplicate keys, but
the subsequent `.keys` and `.values` extraction had non-deterministic iteration
order, breaking the correspondence between key and value arrays in the
generated code.
## Brief change log
- Added `groupByOrdered` utility method in `GenerateUtils` that uses
`LinkedHashMap` to preserve insertion order during grouping operations
- Updated MAP function code generation in `ScalarOperatorGens.scala` to use
deterministic order-preserving deduplication instead of non-deterministic
`groupBy`
- Ensured "last value wins" semantics for duplicate keys by taking the last
occurrence in argument order
- Fixed key-value array correspondence in generated code to prevent
mismatched entries
## Verifying this change
This change is already covered by existing tests, such as:
- **MapFunctionITCase.test()** - Contains the specific failing test case
`map(f0, f0, f0, f1)` that was producing non-deterministic results
- The fix makes the previously flaky test `MAP[1, 1, 1, 2] → {1=2}`
consistently pass
- All existing MAP function tests continue to pass with deterministic
behavior
- Manual verification shows consistent results across multiple test runs and
environments
The change specifically addresses test failures that occurred when constant
folding was disabled, ensuring both code paths (optimized and runtime) produce
consistent results.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **yes** -
affects MAP function code generation, but with minimal performance impact (same
O(n) complexity)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no**
- If yes, how is the feature documented? **not applicable** - this is a bug
fix that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]