[
https://issues.apache.org/jira/browse/KAFKA-499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Neha Narkhede updated KAFKA-499:
--------------------------------
Attachment: kafka-499-v3.patch
Appreciate your patience in reviewing the controller code (again). Thanks a lot
for the detailed review !
0. ReplicaStateMachine:
0.1 handleStateChanges(): Right. Changed it to use the cached version in
controller context.
0.2 handleStateChange(): Changed it.
0.3 handleStateChange(): Good catch! I had forgotten to include the actual
request send. Also, in addition to sending the LeaderAndIsr request to the
leader replica, we also should update the controller's allLeaders cache. And I
agree that we shouldn't try to optimize the non-follower case, at least just
yet.
0.4 handleStateChange(): You raise a good point here. I think the OnlineReplica
state description is a little fuzzy right now. I hope that after KAFKA-42, it
will be more clear. You are right that it is special from the others and this
is due to the hacky way we use the INIT flag to resend the list of partitions
to all the replicas. I actually don't think we should be handling deletes the
way we do today. I think you've agreed on this as part of 20.5 below. However,
until we have that, OnlineReplica probably might stay special. Also, I'm not
sure the other states can be subjected to self transitions without fully
knowing the triggers that can cause such self transitions. I can revisit all
states and see if we can safely allow self transitions for those as well.
0.5 Awesome, I remember discussing the hackiness of this approach during the
design discussions. Great that you agree with me here. Let's push this work to
KAFKA-330. Until then, I've removed the INIT flag support.
1. PartitionStateMachine:
1.1 Right, this probably happened since the startup() API was an after-thought.
Fixed it.
1.2 initializeLeaderAndIsrForPartition(): Good point. Fixed it.
1.3 electLeaderForPartition(): True, fixed it
1.4 handleStateChange(): Actually, changing the state of the partition has no
real effect on the allLeaders cache, unless a new leader is elected or ISR has
changed. We remove the partition from the allLeaders cache only when it is
deleted. We add a partition to this cache, when it enters the OnlinePartition
state.
2. RequestPurgatoryTest: This was accidental, probably during the rebase.
Reverted it.
3. ControllerBrokerRequestBatch.addRequestForBrokers(): It will create a new
hashmap only if it executes that part of the code, which is when the brokerId
doesn't exist.
4. KafkaController:
4.1 Moved startChannelManager to initializeControllerContext.
4.2 Very good point. I agree that we shouldn't really be caching the ISR. I had
initially changed it to cover the Online/OfflineReplica state change user case
to send the LeaderAndIsr request to the affected brokers, without realizing
that caching ISR after that is not useful/safe.
4.3 Good catch ! Fixed it.
4.4 They were in controller initially, but I moved it to the respective state
machine on purpose. The reason is that I thought it will be good for each state
machine to also have the appropriate zookeeper listeners that trigger the state
changes wrapped in that state machine. The reason that the controller has the
callbacks is because the controller needs to notify multiple state machines
about the trigger in a certain order. So you can look at the controller as a
bunch of callbacks required to handle the replication state machine transitions
correctly. That is, controller offers a high level view of the all the possible
meta state changes and triggers that can happen as part of replication. If one
needs to dig deeper, they can look at the individual state machines for the
details.
5. ZookeeperLeaderElector:
5.1 Actually, we don't need to call onBecomingLeader in the startup(), that is
redundant. It needs to be called when the leader successfully writes the new
ephemeral node in zookeeper. So, this can happen only once either in the
LeaderChangeListener or inside elect(). I moved it to elect()
5.2 It is generic for a purpose. When we include the consumer co-ordinator
change, it will require to use the same ZookeeperLeaderElector component for
the election of the consumer co-ordinator. Since it is written to be generic,
the name is generic too.
5.3 Actually, the point is that you just need to register a watcher on startup
and then on new session creation. Since it is a data change listener, it also
includes an exists listener. The problem was that on session expiration, it
wasn't re-registering the leader change listener. This is a bug. I fixed it to
move the registration as the first line in elect() API.
6. Removed all code that referred to the INIT flag in the LeaderAndIsrRequest
> Refactor controller state machine
> ----------------------------------
>
> Key: KAFKA-499
> URL: https://issues.apache.org/jira/browse/KAFKA-499
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8
> Reporter: Neha Narkhede
> Assignee: Neha Narkhede
> Priority: Blocker
> Labels: optimization
> Attachments: kafka-499-v1.patch, kafka-499-v2.patch,
> kafka-499-v3.patch
>
> Original Estimate: 96h
> Remaining Estimate: 96h
>
> Currently, the controller logic is very procedural and is similar to
> KafkaZookeeper. Controller should have a well defined state machine with
> states and transitions. This will make it easier to understand and maintain
> the controller code.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira