[
https://issues.apache.org/jira/browse/KAFKA-499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13456641#comment-13456641
]
Jun Rao commented on KAFKA-499:
-------------------------------
Thanks for patch v2. It's a big change. However, it's changing things in the
right direction. It makes the logic cleaner and covers some of the corner cases
that were missed before. Excellent job! Some comments:
20. ReplicaStateMachine:
20.1 handleStateChanges(): There is no need to read the partition assignment
from ZK. We can just use the cache version in controller context.
20.2 handleStateChange(): We can use leaderAndIsr in the last parameter of the
following method directly.
brokerRequestBatch.addRequestForBrokers(List(replicaId),
topic, partition,
controllerContext.allLeaders((topic, partition)))
20.3 handleStateChange(): For the OfflineReplica case, we should send new
LeaderISRRequests to the leader broker. We should also increment the leader
epoc. In theory, we just need to generate new LeaderISRRequests for partitions
that lost a follower, not the leader (those partitions are handled in
partitionStateChange already). For the latter partitions, we will be sending
the same LeaderISRRequests twice, once through partitionStateTransition, and
another through replicaStateTransition. However, we don't really need to
optimize this right now since the makeLeader process on a broker is very cheap.
20.4 handleStateChange(): The OnlineReplica case is a bit special in that valid
previous states include OnlineReplica itself. No other state transition allows
transition from one state to itself. This seems to be due to that in
initializeReplicaState(), we already initialized some replicas to online state
already. However, in general, should we allow self transition for all states?
This seems to be safer and will guard the case when we somehow redid the same
transition again.
20.5 Before this patch, we will send LeaderAndISR requests with the INIT flag
in 2 cases: one during controller failover and another during broker startup.
The only purpose for the INIT flad is really to ensure that a broker will clean
up deleted topics properly. Thinking about this more, using INIT is really a
hacky way to handle topic deletion when we do have a clean state machine. Now
that we have one, a better way is probably to use a separate topic deletion
path and only delete it once every broker has received the stop replica
request. On both controller failover and replica startup, the new controller
can find out all topics that still need to be deleted and resend the stop
replica requests. This seems to be a better way of handling deletion. So, in
this patch, maybe we can get rid of the INIT flag and not to worry about
deletion. We can add the proper delete state transition when we get to topic
deletion.
21. PartitionStateMachine:
21.1 isShuttingDown needs to be set to false in startup, not just in
constructor since startup can be called multiple times. Ditto for
ReplicaStateMachine.
21.2 initializeLeaderAndIsrForPartition(): Do we really need to read
leaderAndISR from ZK before updating it? In this case, we expect that path not
to exist. We can just create the path with the initial leaderAndISR. If the
path already exists, we will get a ZK exception and we can log a state change
failure.
21.3 electLeaderForPartition(): we should only update
controllerContext.allLeaders if zookeeperPathUpdateSucceeded is true.
21.4 handleStateChange(): We should update controllerContext.allLeaders in both
the OfflinePartition and the NonExistentPartition case.
21.5 remove unused imports
22. RequestPurgatoryTest: Are the changes om testRequestExpiry() needed?
23. ControllerBrokerRequestBatch.addRequestForBrokers(): The following
statement seems to be unnecessarily creating a new HashMap on each call (except
for the first one, which is necessary).
brokerRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String,
Int), LeaderAndIsr])
24. KafkaController:
24.1 can startChannelManager() be part of initializeControllerContext?
24.2 In ControllerContext.allLeaders, we just need to cache the leader of each
partition, not the ISR. This is because ISR can be changed by the leader at any
time, which will invalidate the cache in the controller. Every time that the
controller wants to update leaderAndISR in ZK, it always has to read the latest
ISR from ZK anyway. Also, there is no need to call updateLeaderAndIsrCache in
onBrokerFailure and onBrokerStartup. The leader cache is always updated when
there is a leader change. ISR always has to be read from ZK before each update.
24.3 We need to hold a controller lock whenever we call onControllerFailover()
since it registers listeners and therefore allows the watcher handler to update
cached maps in controller context concurrently. Currently,
ZookeeperLeaderElector.startup can call onControllerFailover() without the
controller lock.
24.4 It seems that all ZK listeners like BrokerChangeListener,
TopicChangeListener and PartitionChangeListener should be in controller, not
inside ReplicaStateMachine and PartitionStateMachine. Both new-topic and broker
failure watchers are defined in controller and state change machines just need
to act on state changes.
25. ZookeeperLeaderElector:
25.1 Currently, it seems that we can call onBecomingLeader() twice, once in
startup and another in LeaderChangeListener.handleDataChange. This may not
cause any harm now, but it would be better if we can avoid it. One possible way
is to call onBecomingLeader in elect if the election wins. Then we can get rid
of the code in LeaderChangeListener.handleDataChange and the call to
onBecomingLeader() in startup.
25.2 The name ZookeeperLeaderElector seems very generic. Should we name it to
ControllerElector? Ditto for LeaderChangeListener?
25.3 The subscription of leaderChangeListener needs to be done in elect() after
the ephemeral path is created successfully, not in startup. This is because we
need to set the watcher each time the controller path is created and only the
subscription sets the watcher.
> Refactor controller state machine
> ----------------------------------
>
> Key: KAFKA-499
> URL: https://issues.apache.org/jira/browse/KAFKA-499
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8
> Reporter: Neha Narkhede
> Assignee: Neha Narkhede
> Priority: Blocker
> Labels: optimization
> Attachments: kafka-499-v1.patch, kafka-499-v2.patch
>
> Original Estimate: 96h
> Remaining Estimate: 96h
>
> Currently, the controller logic is very procedural and is similar to
> KafkaZookeeper. Controller should have a well defined state machine with
> states and transitions. This will make it easier to understand and maintain
> the controller code.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira