nateab opened a new pull request, #27505:
URL: https://github.com/apache/flink/pull/27505
## What is the purpose of the change
This pull request fixes a bug in
`MiniBatchGroupAggFunction.finishBundle()` where records were being silently
dropped when a mini-batch bundle contained a key with only retraction messages
and no existing
accumulator state.
The root cause was using `return` instead of `continue` when `inputRows`
became empty after filtering out leading retraction messages for a key with no
state. This caused the method to exit entirely, abandoning
processing of all remaining keys in the bundle.
## Brief change log
- Changed `return;` to `continue;` in
`MiniBatchGroupAggFunction.finishBundle()` so processing continues to the next
key instead of exiting the entire method
- Added unit test `MiniBatchGroupAggFunctionTest` that directly verifies
the fix by simulating a bundle with multiple keys where the first key has only
retractions
## Verifying this change
This change added tests and can be verified as follows:
- Added
`MiniBatchGroupAggFunctionTest.testFinishBundleContinuesAfterEmptyInputRows()`
which creates a mock bundle with three keys where the first key has only a
DELETE message (no existing state). The test
verifies that:
- **Without the fix**: 0 outputs are produced (all subsequent keys
dropped)
- **With the fix**: 2 outputs are produced (keys "bbb" and "ccc"
processed correctly)
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes
(mini-batch aggregation path, but change is minimal - only control flow fix)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]