[ 
https://issues.apache.org/jira/browse/KAFKA-984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13715843#comment-13715843
 ] 

Guozhang Wang commented on KAFKA-984:
-------------------------------------

Approach:

1. In ZookeeperConsumerConnector.reinitializeConsumer, add three additional 
arguments: 

partial : boolean

Seq[String] : addedTopics

Seq[String] : deletedTopics

1. In WildcardStreamsHandler.handleTopicEvent, calling reinitializeConsumer 
with partial = true with addedTopics and deletedTopics set.

2. In ZookeeperConsumerConnector.reinitializeConsumer, if partial == true, 
branch out the code starting from

    // map of {topic -> Set(thread-1, thread-2, ...)}
    val consumerThreadIdsPerTopic: Map[String, Set[String]] =
      topicCount.getConsumerThreadIdsPerTopic

to

    val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
    groupedByTopic.foreach(e => {
      val topic = e._1
      val streams = e._2.map(_._2._2).toList
      topicStreamsMap += (topic -> streams)
      debug("adding topic %s and %d streams to map.".format(topic, 
streams.size))
    })

But just update topicThreadIdAndQueues and topicStreamsMap

* Note that we currently do not handle deleted topics, and this issue will not 
be fixed in this JIRA.

3. Add another function syncedPartialRebalance in ZKRebalancerListener, and 
making the reinitializeConsumer to call this function if partial == true. 
syncedPartialRebalance will use the same rebalanceLock as syncedRebalance.

4. ZKRebalancerListener keeps a list of topics it is currently consuming from 
in memory.

4. In syncedPartialRebalance, which calls rebalanceForTopic, it first checks if 
there are any changes of the topics by reading the ZK and comparing with its in 
memory list. If no new topics or deleted topics found, return directly.

4.1. For each deleted topic, simply call closeFetchers, and delete the 
ownership/offsets of the topic in ZK.

4.2 For each added topic, read the number of consumers of the group from ZK (we 
only do this once for all topics), and read the number of partitions of this 
topic, assign the partitions to consumers using the same deterministic 
algorithm.

4.3 Try writing to the ZK for all added topics. If succeed, update fetchers 
(start new threads) and return true, otherwise return false.

Considerata:

1. If a topic change and consumer/broker change happens at the same time, two 
consumers could trigger the corresponding syncedRebalance and 
syncedPartialRebalance at different orders. In this case we would prefer to 
make syncedPartialRebalance fail fast and make everyone entering the 
syncedRebalance phase. So one possible optimization is to check 
isWatcherTriggered at the beginning of syncedPartialRebalance, if it is set, 
return false directly. Also do not retry in syncedPartialRebalance.

2. Stopping fecthers for certain topics, i.e., only sending a partial 
threadIdMap to closeFetchers(cluster: Cluster, messageStreams: 
Map[String,List[KafkaStream[_,_]]], relevantTopicThreadIdsMap: Map[String, 
Set[String]]) is not used before. Not clear if this works well.

                
> Avoid a full rebalance in cases when a new topic is discovered but 
> container/broker set stay the same
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-984
>                 URL: https://issues.apache.org/jira/browse/KAFKA-984
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>             Fix For: 0.8
>
>
> Currently a full rebalance will be triggered on high level consumers even 
> when just a new topic is added to ZK. Better avoid this behavior but only 
> rebalance on this newly added topic.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to