[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488735#comment-16488735
 ] 

ASF GitHub Bot commented on FLINK-9303:
---------------------------------------

Github user EAlexRojas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r190529981
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
    @@ -374,8 +385,8 @@ void setOffsetsToCommit(
         * <p>This method is exposed for testing purposes.
         */
        @VisibleForTesting
    -   void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> 
newPartitions) throws Exception {
    -           if (newPartitions.size() == 0) {
    +   void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> 
newPartitions, Set<TopicPartition> partitionsToBeRemoved) throws Exception {
    --- End diff --
    
    I though about it, but my only concern is the case where we'd have both, 
partitions to add and partitions to remove...  
    the `consumerCallBridge.assignPartitions()` takes the whole new list of 
partitions, so in that case, we would need to wait for the first assignment 
(e.g. add new partitions) before doing the second assignment (e.g. remove 
partitions) in order to have a consistent list of partitions. 
    I think we would try to have only one call to 
`consumerCallBridge.assignPartitions()`.
    
    Maybe I could refactor the part where partitions are removed from old 
partitions to a separate private method like `removeFromOldPartitions()` ?
    
    What do you think ?


> Unassign partitions from Kafka client if partitions become unavailable
> ----------------------------------------------------------------------
>
>                 Key: FLINK-9303
>                 URL: https://issues.apache.org/jira/browse/FLINK-9303
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Major
>             Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to