[
https://issues.apache.org/jira/browse/FLINK-39980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18091072#comment-18091072
]
Taranpreet Kaur commented on FLINK-39980:
-----------------------------------------
Hi [~bli],
I would like to work on this Jira. Recently, we were working with dynamic kafka
and gained some knowledge on its code base. Please assign this Jira to me.
> Implement split rebalance/reassignment support in DynamicKafkaSource after
> metadata changes
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-39980
> URL: https://issues.apache.org/jira/browse/FLINK-39980
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / Kafka
> Affects Versions: 2.3.0
> Reporter: Bowen Li
> Assignee: Bowen Li
> Priority: Major
>
> DynamicKafkaSource can remove clusters/topics when stream metadata changes.
> After removal, some source subtasks may become empty while other subtasks
> still own active partitions. This can happen even when {{{}parallelism <=
> total active partitions{}}}, because the current assignment is inherited from
> prior split ownership and does not rebalance active splits across the
> remaining readers.
> Flink has added a source contract for enumerators to regain a global split
> view and perform reassignment/rebalancing after recovery:
> {{apache/flink#27149}} / FLIP-537. DynamicKafkaSource should implement this
> contract once it is available in our Flink runtime.
> *Goals*
> Implement the new rebalance/reassignment interface in DynamicKafkaSource so
> that after metadata removal or recovery, active splits are redistributed
> across available readers.
> *Expected Behavior*
> When active Kafka partitions remain and {{{}parallelism <= total active
> partitions{}}}, DynamicKafkaSource should avoid empty source subtasks by
> rebalancing active splits across readers.
> When {{{}total active partitions < parallelism{}}}, empty subtasks are
> expected, but they should be handled by idleness logic and autoscaler
> calibration.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)