arvindKandpal-ksolves opened a new pull request, #28479:
URL: https://github.com/apache/flink/pull/28479
## What is the purpose of the change
This pull request addresses the issue detailed in FLINK-37351, which aims to
ensure writer and committer colocation specifically for non-pre-commit
topologies. This optimization eliminates unnecessary network hops between the
writer and committer, thereby improving overall performance and potentially
enabling backchannels (Statefun style).
*Note: This PR takes over the stale work originally started in PR #27486. It
fixes the CI propagation issue from the previous PR by moving the colocation
group key assignment to the very beginning of the translation lifecycle
(`translateInternal`).*
## Brief change log
- Assigned a default co-location group key in
`SinkTransformationTranslator.translateInternal()` for `SinkTransformation`s
where the sink implements `SupportsCommitter` but does NOT implement
`SupportsPreCommitTopology`.
- Setting this early ensures the co-location property propagates correctly
to all expanded child transformations (Writer and Committer).
- Ensured that any user-specified co-location group keys are strictly
respected and not overwritten.
## Verifying this change
This change added tests and can be verified as follows:
- Added `testWriterAndCommitterColocatedWithoutPreCommitTopology` to
verify writer/committer colocation is strictly applied for non-pre-commit
topologies.
- Added `testWriterAndCommitterNotColocatedWithPreCommitTopology` to
verify sinks with pre-commit topologies bypass this co-location logic.
- Added `testUserSpecifiedCoLocationGroupIsRespected` to verify that
manually defined co-location groups by the user are preserved across the
translation pipeline.
- Verified that all downstream properties (parallelism, slot sharing
groups, etc.) propagate seamlessly.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **yes** *(Modifies task
scheduling/colocation for Sinks)*
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no**
*(Optimization/Bugfix)*
- If yes, how is the feature documented? **not applicable**
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change the checkbox below to `[X]` followed by the name of the tool, and
uncomment the
"Generated-by" line. See the ASF Generative Tooling Guidance for details:
https://www.apache.org/legal/generative-tooling.html
You are responsible for the quality and correctness of every change in this
PR
regardless of the tooling used. Low-effort AI-generated PRs will be closed.
See
AGENTS.md for the full guidance.
-->
- [ ] Yes (please specify the tool below)
<!--
Generated-by: [Tool Name and Version]
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]