arvindKandpal-ksolves opened a new pull request, #28479:
URL: https://github.com/apache/flink/pull/28479

   ## What is the purpose of the change
   
   This pull request addresses the issue detailed in FLINK-37351, which aims to 
ensure writer and committer colocation specifically for non-pre-commit 
topologies. This optimization eliminates unnecessary network hops between the 
writer and committer, thereby improving overall performance and potentially 
enabling backchannels (Statefun style). 
   
   *Note: This PR takes over the stale work originally started in PR #27486. It 
fixes the CI propagation issue from the previous PR by moving the colocation 
group key assignment to the very beginning of the translation lifecycle 
(`translateInternal`).*
   
   ## Brief change log
   
     - Assigned a default co-location group key in 
`SinkTransformationTranslator.translateInternal()` for `SinkTransformation`s 
where the sink implements `SupportsCommitter` but does NOT implement 
`SupportsPreCommitTopology`.
     - Setting this early ensures the co-location property propagates correctly 
to all expanded child transformations (Writer and Committer).
     - Ensured that any user-specified co-location group keys are strictly 
respected and not overwritten.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Added `testWriterAndCommitterColocatedWithoutPreCommitTopology` to 
verify writer/committer colocation is strictly applied for non-pre-commit 
topologies.
     - Added `testWriterAndCommitterNotColocatedWithPreCommitTopology` to 
verify sinks with pre-commit topologies bypass this co-location logic.
     - Added `testUserSpecifiedCoLocationGroupIsRespected` to verify that 
manually defined co-location groups by the user are preserved across the 
translation pipeline.
     - Verified that all downstream properties (parallelism, slot sharing 
groups, etc.) propagate seamlessly.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **yes** *(Modifies task 
scheduling/colocation for Sinks)*
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no** 
*(Optimization/Bugfix)*
     - If yes, how is the feature documented? **not applicable**
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change the checkbox below to `[X]` followed by the name of the tool, and 
uncomment the
   "Generated-by" line. See the ASF Generative Tooling Guidance for details:
   https://www.apache.org/legal/generative-tooling.html
   
   You are responsible for the quality and correctness of every change in this 
PR
   regardless of the tooling used. Low-effort AI-generated PRs will be closed. 
See
   AGENTS.md for the full guidance.
   -->
   
   - [ ] Yes (please specify the tool below)
   
   <!--
   Generated-by: [Tool Name and Version]
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to