[ 
https://issues.apache.org/jira/browse/FLINK-39980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18091072#comment-18091072
 ] 

Taranpreet Kaur commented on FLINK-39980:
-----------------------------------------

 Hi [~bli],

I would like to work on this Jira. Recently, we were working with dynamic kafka 
and gained some knowledge on its code base. Please assign this Jira to me.

> Implement split rebalance/reassignment support in DynamicKafkaSource after 
> metadata changes
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39980
>                 URL: https://issues.apache.org/jira/browse/FLINK-39980
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / Kafka
>    Affects Versions: 2.3.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>
> DynamicKafkaSource can remove clusters/topics when stream metadata changes. 
> After removal, some source subtasks may become empty while other subtasks 
> still own active partitions. This can happen even when {{{}parallelism <= 
> total active partitions{}}}, because the current assignment is inherited from 
> prior split ownership and does not rebalance active splits across the 
> remaining readers.
> Flink has added a source contract for enumerators to regain a global split 
> view and perform reassignment/rebalancing after recovery: 
> {{apache/flink#27149}} / FLIP-537. DynamicKafkaSource should implement this 
> contract once it is available in our Flink runtime.
> *Goals*
> Implement the new rebalance/reassignment interface in DynamicKafkaSource so 
> that after metadata removal or recovery, active splits are redistributed 
> across available readers.
> *Expected Behavior*
> When active Kafka partitions remain and {{{}parallelism <= total active 
> partitions{}}}, DynamicKafkaSource should avoid empty source subtasks by 
> rebalancing active splits across readers.
> When {{{}total active partitions < parallelism{}}}, empty subtasks are 
> expected, but they should be handled by idleness logic and autoscaler 
> calibration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to