[ https://issues.apache.org/jira/browse/KAFKA-499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Neha Narkhede updated KAFKA-499: -------------------------------- Attachment: kafka-499-v3.patch Appreciate your patience in reviewing the controller code (again). Thanks a lot for the detailed review ! 0. ReplicaStateMachine: 0.1 handleStateChanges(): Right. Changed it to use the cached version in controller context. 0.2 handleStateChange(): Changed it. 0.3 handleStateChange(): Good catch! I had forgotten to include the actual request send. Also, in addition to sending the LeaderAndIsr request to the leader replica, we also should update the controller's allLeaders cache. And I agree that we shouldn't try to optimize the non-follower case, at least just yet. 0.4 handleStateChange(): You raise a good point here. I think the OnlineReplica state description is a little fuzzy right now. I hope that after KAFKA-42, it will be more clear. You are right that it is special from the others and this is due to the hacky way we use the INIT flag to resend the list of partitions to all the replicas. I actually don't think we should be handling deletes the way we do today. I think you've agreed on this as part of 20.5 below. However, until we have that, OnlineReplica probably might stay special. Also, I'm not sure the other states can be subjected to self transitions without fully knowing the triggers that can cause such self transitions. I can revisit all states and see if we can safely allow self transitions for those as well. 0.5 Awesome, I remember discussing the hackiness of this approach during the design discussions. Great that you agree with me here. Let's push this work to KAFKA-330. Until then, I've removed the INIT flag support. 1. PartitionStateMachine: 1.1 Right, this probably happened since the startup() API was an after-thought. Fixed it. 1.2 initializeLeaderAndIsrForPartition(): Good point. Fixed it. 1.3 electLeaderForPartition(): True, fixed it 1.4 handleStateChange(): Actually, changing the state of the partition has no real effect on the allLeaders cache, unless a new leader is elected or ISR has changed. We remove the partition from the allLeaders cache only when it is deleted. We add a partition to this cache, when it enters the OnlinePartition state. 2. RequestPurgatoryTest: This was accidental, probably during the rebase. Reverted it. 3. ControllerBrokerRequestBatch.addRequestForBrokers(): It will create a new hashmap only if it executes that part of the code, which is when the brokerId doesn't exist. 4. KafkaController: 4.1 Moved startChannelManager to initializeControllerContext. 4.2 Very good point. I agree that we shouldn't really be caching the ISR. I had initially changed it to cover the Online/OfflineReplica state change user case to send the LeaderAndIsr request to the affected brokers, without realizing that caching ISR after that is not useful/safe. 4.3 Good catch ! Fixed it. 4.4 They were in controller initially, but I moved it to the respective state machine on purpose. The reason is that I thought it will be good for each state machine to also have the appropriate zookeeper listeners that trigger the state changes wrapped in that state machine. The reason that the controller has the callbacks is because the controller needs to notify multiple state machines about the trigger in a certain order. So you can look at the controller as a bunch of callbacks required to handle the replication state machine transitions correctly. That is, controller offers a high level view of the all the possible meta state changes and triggers that can happen as part of replication. If one needs to dig deeper, they can look at the individual state machines for the details. 5. ZookeeperLeaderElector: 5.1 Actually, we don't need to call onBecomingLeader in the startup(), that is redundant. It needs to be called when the leader successfully writes the new ephemeral node in zookeeper. So, this can happen only once either in the LeaderChangeListener or inside elect(). I moved it to elect() 5.2 It is generic for a purpose. When we include the consumer co-ordinator change, it will require to use the same ZookeeperLeaderElector component for the election of the consumer co-ordinator. Since it is written to be generic, the name is generic too. 5.3 Actually, the point is that you just need to register a watcher on startup and then on new session creation. Since it is a data change listener, it also includes an exists listener. The problem was that on session expiration, it wasn't re-registering the leader change listener. This is a bug. I fixed it to move the registration as the first line in elect() API. 6. Removed all code that referred to the INIT flag in the LeaderAndIsrRequest > Refactor controller state machine > ---------------------------------- > > Key: KAFKA-499 > URL: https://issues.apache.org/jira/browse/KAFKA-499 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Neha Narkhede > Assignee: Neha Narkhede > Priority: Blocker > Labels: optimization > Attachments: kafka-499-v1.patch, kafka-499-v2.patch, > kafka-499-v3.patch > > Original Estimate: 96h > Remaining Estimate: 96h > > Currently, the controller logic is very procedural and is similar to > KafkaZookeeper. Controller should have a well defined state machine with > states and transitions. This will make it easier to understand and maintain > the controller code. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira