[ https://issues.apache.org/jira/browse/KAFKA-499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13456641#comment-13456641 ]
Jun Rao commented on KAFKA-499: ------------------------------- Thanks for patch v2. It's a big change. However, it's changing things in the right direction. It makes the logic cleaner and covers some of the corner cases that were missed before. Excellent job! Some comments: 20. ReplicaStateMachine: 20.1 handleStateChanges(): There is no need to read the partition assignment from ZK. We can just use the cache version in controller context. 20.2 handleStateChange(): We can use leaderAndIsr in the last parameter of the following method directly. brokerRequestBatch.addRequestForBrokers(List(replicaId), topic, partition, controllerContext.allLeaders((topic, partition))) 20.3 handleStateChange(): For the OfflineReplica case, we should send new LeaderISRRequests to the leader broker. We should also increment the leader epoc. In theory, we just need to generate new LeaderISRRequests for partitions that lost a follower, not the leader (those partitions are handled in partitionStateChange already). For the latter partitions, we will be sending the same LeaderISRRequests twice, once through partitionStateTransition, and another through replicaStateTransition. However, we don't really need to optimize this right now since the makeLeader process on a broker is very cheap. 20.4 handleStateChange(): The OnlineReplica case is a bit special in that valid previous states include OnlineReplica itself. No other state transition allows transition from one state to itself. This seems to be due to that in initializeReplicaState(), we already initialized some replicas to online state already. However, in general, should we allow self transition for all states? This seems to be safer and will guard the case when we somehow redid the same transition again. 20.5 Before this patch, we will send LeaderAndISR requests with the INIT flag in 2 cases: one during controller failover and another during broker startup. The only purpose for the INIT flad is really to ensure that a broker will clean up deleted topics properly. Thinking about this more, using INIT is really a hacky way to handle topic deletion when we do have a clean state machine. Now that we have one, a better way is probably to use a separate topic deletion path and only delete it once every broker has received the stop replica request. On both controller failover and replica startup, the new controller can find out all topics that still need to be deleted and resend the stop replica requests. This seems to be a better way of handling deletion. So, in this patch, maybe we can get rid of the INIT flag and not to worry about deletion. We can add the proper delete state transition when we get to topic deletion. 21. PartitionStateMachine: 21.1 isShuttingDown needs to be set to false in startup, not just in constructor since startup can be called multiple times. Ditto for ReplicaStateMachine. 21.2 initializeLeaderAndIsrForPartition(): Do we really need to read leaderAndISR from ZK before updating it? In this case, we expect that path not to exist. We can just create the path with the initial leaderAndISR. If the path already exists, we will get a ZK exception and we can log a state change failure. 21.3 electLeaderForPartition(): we should only update controllerContext.allLeaders if zookeeperPathUpdateSucceeded is true. 21.4 handleStateChange(): We should update controllerContext.allLeaders in both the OfflinePartition and the NonExistentPartition case. 21.5 remove unused imports 22. RequestPurgatoryTest: Are the changes om testRequestExpiry() needed? 23. ControllerBrokerRequestBatch.addRequestForBrokers(): The following statement seems to be unnecessarily creating a new HashMap on each call (except for the first one, which is necessary). brokerRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr]) 24. KafkaController: 24.1 can startChannelManager() be part of initializeControllerContext? 24.2 In ControllerContext.allLeaders, we just need to cache the leader of each partition, not the ISR. This is because ISR can be changed by the leader at any time, which will invalidate the cache in the controller. Every time that the controller wants to update leaderAndISR in ZK, it always has to read the latest ISR from ZK anyway. Also, there is no need to call updateLeaderAndIsrCache in onBrokerFailure and onBrokerStartup. The leader cache is always updated when there is a leader change. ISR always has to be read from ZK before each update. 24.3 We need to hold a controller lock whenever we call onControllerFailover() since it registers listeners and therefore allows the watcher handler to update cached maps in controller context concurrently. Currently, ZookeeperLeaderElector.startup can call onControllerFailover() without the controller lock. 24.4 It seems that all ZK listeners like BrokerChangeListener, TopicChangeListener and PartitionChangeListener should be in controller, not inside ReplicaStateMachine and PartitionStateMachine. Both new-topic and broker failure watchers are defined in controller and state change machines just need to act on state changes. 25. ZookeeperLeaderElector: 25.1 Currently, it seems that we can call onBecomingLeader() twice, once in startup and another in LeaderChangeListener.handleDataChange. This may not cause any harm now, but it would be better if we can avoid it. One possible way is to call onBecomingLeader in elect if the election wins. Then we can get rid of the code in LeaderChangeListener.handleDataChange and the call to onBecomingLeader() in startup. 25.2 The name ZookeeperLeaderElector seems very generic. Should we name it to ControllerElector? Ditto for LeaderChangeListener? 25.3 The subscription of leaderChangeListener needs to be done in elect() after the ephemeral path is created successfully, not in startup. This is because we need to set the watcher each time the controller path is created and only the subscription sets the watcher. > Refactor controller state machine > ---------------------------------- > > Key: KAFKA-499 > URL: https://issues.apache.org/jira/browse/KAFKA-499 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Neha Narkhede > Assignee: Neha Narkhede > Priority: Blocker > Labels: optimization > Attachments: kafka-499-v1.patch, kafka-499-v2.patch > > Original Estimate: 96h > Remaining Estimate: 96h > > Currently, the controller logic is very procedural and is similar to > KafkaZookeeper. Controller should have a well defined state machine with > states and transitions. This will make it easier to understand and maintain > the controller code. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira