[ 
https://issues.apache.org/jira/browse/KAFKA-499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-499:
--------------------------------

    Attachment: kafka-499-v3.patch

Appreciate your patience in reviewing the controller code (again). Thanks a lot 
for the detailed review !
0. ReplicaStateMachine:
0.1 handleStateChanges(): Right. Changed it to use the cached version in 
controller context.
0.2 handleStateChange(): Changed it.
0.3 handleStateChange(): Good catch! I had forgotten to include the actual 
request send. Also, in addition to sending the LeaderAndIsr request to the 
leader replica, we also should update the controller's allLeaders cache. And I 
agree that we shouldn't try to optimize the non-follower case, at least just 
yet.
0.4 handleStateChange(): You raise a good point here. I think the OnlineReplica 
state description is a little fuzzy right now. I hope that after KAFKA-42, it 
will be more clear. You are right that it is special from the others and this 
is due to the hacky way we use the INIT flag to resend the list of partitions 
to all the replicas. I actually don't think we should be handling deletes the 
way we do today. I think you've agreed on this as part of 20.5 below. However, 
until we have that, OnlineReplica probably might stay special. Also, I'm not 
sure the other states can be subjected to self transitions without fully 
knowing the triggers that can cause such self transitions. I can revisit all 
states and see if we can safely allow self transitions for those as well.
0.5 Awesome, I remember discussing the hackiness of this approach during the 
design discussions. Great that you agree with me here. Let's push this work to 
KAFKA-330. Until then, I've removed the INIT flag support.

1. PartitionStateMachine:
1.1 Right, this probably happened since the startup() API was an after-thought. 
Fixed it.
1.2 initializeLeaderAndIsrForPartition(): Good point. Fixed it.
1.3 electLeaderForPartition(): True, fixed it
1.4 handleStateChange(): Actually, changing the state of the partition has no 
real effect on the allLeaders cache, unless a new leader is elected or ISR has 
changed. We remove the partition from the allLeaders cache only when it is 
deleted. We add a partition to this cache, when it enters the OnlinePartition 
state.

2. RequestPurgatoryTest: This was accidental, probably during the rebase. 
Reverted it.

3. ControllerBrokerRequestBatch.addRequestForBrokers(): It will create a new 
hashmap only if it executes that part of the code, which is when the brokerId 
doesn't exist.

4. KafkaController:
4.1 Moved startChannelManager to initializeControllerContext.
4.2 Very good point. I agree that we shouldn't really be caching the ISR. I had 
initially changed it to cover the Online/OfflineReplica state change user case 
to send the LeaderAndIsr request to the affected brokers, without realizing 
that caching ISR after that is not useful/safe.
4.3 Good catch ! Fixed it.
4.4 They were in controller initially, but I moved it to the respective state 
machine on purpose. The reason is that I thought it will be good for each state 
machine to also have the appropriate zookeeper listeners that trigger the state 
changes wrapped in that state machine. The reason that the controller has the 
callbacks is because the controller needs to notify multiple state machines 
about the trigger in a certain order. So you can look at the controller as a 
bunch of callbacks required to handle the replication state machine transitions 
correctly. That is, controller offers a high level view of the all the possible 
meta state changes and triggers that can happen as part of replication. If one 
needs to dig deeper, they can look at the individual state machines for the 
details.

5. ZookeeperLeaderElector:
5.1 Actually, we don't need to call onBecomingLeader in the startup(), that is 
redundant. It needs to be called when the leader successfully writes the new 
ephemeral node in zookeeper. So, this can happen only once either in the 
LeaderChangeListener or inside elect(). I moved it to elect()
5.2 It is generic for a purpose. When we include the consumer co-ordinator 
change, it will require to use the same ZookeeperLeaderElector component for 
the election of the consumer co-ordinator. Since it is written to be generic, 
the name is generic too.
5.3 Actually, the point is that you just need to register a watcher on startup 
and then on new session creation. Since it is a data change listener, it also 
includes an exists listener. The problem was that on session expiration, it 
wasn't re-registering the leader change listener. This is a bug. I fixed it to 
move the registration as the first line in elect() API.

6. Removed all code that referred to the INIT flag in the LeaderAndIsrRequest

                
> Refactor controller state machine 
> ----------------------------------
>
>                 Key: KAFKA-499
>                 URL: https://issues.apache.org/jira/browse/KAFKA-499
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Blocker
>              Labels: optimization
>         Attachments: kafka-499-v1.patch, kafka-499-v2.patch, 
> kafka-499-v3.patch
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> Currently, the controller logic is very procedural and is similar to 
> KafkaZookeeper. Controller should have a well defined state machine with 
> states and transitions. This will make it easier to understand and maintain 
> the controller code. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to