[ https://issues.apache.org/jira/browse/KAFKA-343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13404797#comment-13404797 ]
Jun Rao commented on KAFKA-343: ------------------------------- Thanks for the patch. Overall, the patch looks promising. Some comments: 1. KafkaController: 1.1 Is the following global import needed since it's imported in a nested class already? import collection.JavaConversions._ 1.2 ControllerChannelManager: brokers in allBrokers are added twice in the constructor. 1.3 BrokerChangeListener: If you want to do sth for each item in a collection but don't need an output collection, use collection.foreach instead of collection.map. For example, the following usage of map should be changed to foreach. addedBrokersSeq.map(controllerChannelManager.addBroker(_)) deletedBrokerIds.map(controllerChannelManager.removeBroker(_)) 1.4 onBrokerChange(): 1.4.1 We should use cached eplica assignment allTopicPartitionAssignment in controller, instead of re-reading from ZK on every broker failure. 1.4.2 Instead of sending a leaderAndISRRequest per topicPartition, we should batch them and send only 1 leaderAndISRRequest per broker for each onBrokerChange() call. 1.4.3 If no assigned replica is in liveBrokerIds, we should log an error for the affected topic/partition. 1.4.4 Instead of using curZkPathVersion + 1 as the new version of a ZK path, we should use the new ZK version returned in conditionalUpdate. There is no guarantee that the new version is always curZkPathVersion + 1. 1.4.5 We should add an info level log for the final leaderAndISRRequest that we send to each broker. 1.4.6 If we select a new leader that's not in ISR, we should log a warning and indicate that it can cause potential data loss. 1.4.7 ZkUtils.getLeaderAndISRForPartition may return none. We need to handle it. 1.5 TopicChangeListener.handleChildChange(): We only need to update allTopicPartitionAssignment for the new and deleted topics. 1.6 handleNewTopics(): We should use allTopicPartitionAssignment, instead of reading from ZK. 1.7 allTopicPartitionAssignment: the replicas should be a list, instead of a set. This is because the ordering of the replicas is important and the 1st replica is the preferred one. We won't make use of it in this jira, but will be in the future. 1.8 initLeaders(): 1.8.1 We should log a warning if no assigned replica is live. 1.8.2 Similar to onBrokerChange (), we should try to batch LeaderAndISRRequest to each broker. 1.8.3 We should set noInit in LeaderAndISRRequest. IsInit will only be used in leaderAndISRRecovery(). 1.9 in all ZK event handlers, let's add try/catch for all throwables and log them. 1.10 leaderAndISRRecovery(): we should try to batch LeaderAndISRRequest to each broker. 1.11 tryToBecomeController(): I would add explicitly return keyword for each of the return statements to make things clear since they are not the last one in the function. 1.12 handleDeletedTopics(): 1.12.1 delete partitions are already removed from allTopicPartitionAssignment. So we need to pass the assignment in. 1.12.2 we should try to batch stopReplicaRequest to each broker. 2. KafkaApis.handleLeaderAndISRRequest(): don't need to check if(leaderAndISR.ISR.contains(brokerId)). This allows the controller to bootstrap a new replica (from changing #partitions or partition assignments). 3. LeaderAndISRRequest: It's better to change isInit to boolean and send the right byte under the cover. 4. KafkaZookeeper.initLocalReplicas(): should remove local replicas no longer assigned to this broker. Also, this function needs to be called before registerBrokerInZk(). Otherwise, a deleted topic can be recreated and assigned to this broker before local replica is cleaned up first. 5. AdminUtils: Remove the following line: scala.Seq 6. KafkaRequestHandler: Doesn't seem to require any change. 7. LeaderAndISRRequest: It seems that CurrentVersion is more appropriate than InitialLeaderAndISRRequestVersion since the version could evolve in the future. 8. Partition.updateISR(): This function is used in 3 places: ReplicaManager.makeLeader, maybeShrinkISR and recordFollowerPosition. In the first case, we only need to update the ISR in cache since we know the controller already updated the ISR in ZK. However, the latter 2 cases are initiated by the leader. So we need to update ISR in both cache and ZK. So, we will need 2 versions of updateISR(), one just updating the cache, another updating both cache and ZK. 9. Replica.isLeader(): need to handle the case when leader doesn't exist 10. ReplicaManager: 10.1 The indentation for some logging statements cross 2 lines is wrong. 10.2 stopReplica: The deletion of a log should probably be done in LogManager with a new function deleteLog. 11. StopReplicaRequest: It seems that CurrentVersion is more appropriate than InitialVersion since the version could evolve in the future. 12. ControllerBasicTest: Why is the test commented out? 13. LeaderElectionTest: why is testEpoch() removed? 14. log4j.properties: Is the change just for debugging? > revisit the become leader and become follower state change operations using > V3 design > ------------------------------------------------------------------------------------- > > Key: KAFKA-343 > URL: https://issues.apache.org/jira/browse/KAFKA-343 > Project: Kafka > Issue Type: Sub-task > Components: core > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Yang Ye > Fix For: 0.8 > > Attachments: kafka_343.patch > > > We need to reimplement become leader/follower using the controller model > described in > https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira