[ 
https://issues.apache.org/jira/browse/KAFKA-499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13456641#comment-13456641
 ] 

Jun Rao commented on KAFKA-499:
-------------------------------

Thanks for patch v2. It's a big change. However, it's changing things in the 
right direction. It makes the logic cleaner and covers some of the corner cases 
that were missed before. Excellent job! Some comments:

20. ReplicaStateMachine:
20.1 handleStateChanges(): There is no need to read the partition assignment 
from ZK. We can just use the cache version in controller context.
20.2 handleStateChange(): We can use leaderAndIsr in the last parameter of the 
following method directly.
                  brokerRequestBatch.addRequestForBrokers(List(replicaId), 
topic, partition,
                    controllerContext.allLeaders((topic, partition)))
20.3 handleStateChange(): For the OfflineReplica case, we should send new 
LeaderISRRequests to the leader broker. We should also increment the leader 
epoc. In theory, we just need to generate new LeaderISRRequests for partitions 
that lost a follower, not the leader (those partitions are handled in 
partitionStateChange already). For the latter partitions, we will be sending 
the same LeaderISRRequests twice, once through partitionStateTransition, and 
another through replicaStateTransition. However, we don't really need to 
optimize this right now since the makeLeader process on a broker is very cheap.
20.4 handleStateChange(): The OnlineReplica case is a bit special in that valid 
previous states include OnlineReplica itself. No other state transition allows 
transition from one state to itself. This seems to be due to that in 
initializeReplicaState(), we already initialized some replicas to online state 
already. However, in general, should we allow self transition for all states? 
This seems to be safer and will guard the case when we somehow redid the same 
transition again.
20.5 Before this patch, we will send LeaderAndISR requests with the INIT flag 
in 2 cases: one during controller failover and another during broker startup. 
The only purpose for the INIT flad is really to ensure that a broker will clean 
up deleted topics properly. Thinking about this more, using INIT is really a 
hacky way to handle topic deletion when we do have a clean state machine. Now 
that we have one, a better way is probably to use a separate topic deletion 
path and only delete it once every broker has received the stop replica 
request. On both controller failover and replica startup, the new controller 
can find out all topics that still need to be deleted and resend the stop 
replica requests. This seems to be a better way of handling deletion. So, in 
this patch, maybe we can get rid of the INIT flag and not to worry about 
deletion. We can add the proper delete state transition when we get to topic 
deletion.

21. PartitionStateMachine:
21.1 isShuttingDown needs to be set to false in startup, not just in 
constructor since startup can be called multiple times. Ditto for 
ReplicaStateMachine.
21.2 initializeLeaderAndIsrForPartition(): Do we really need to read 
leaderAndISR from ZK before updating it? In this case, we expect that path not 
to exist. We can just create the path with the initial leaderAndISR. If the 
path already exists, we will get a ZK exception and we can log a state change 
failure.
21.3 electLeaderForPartition(): we should only update 
controllerContext.allLeaders if zookeeperPathUpdateSucceeded is true.
21.4 handleStateChange(): We should update controllerContext.allLeaders in both 
the OfflinePartition and the NonExistentPartition case.
21.5 remove unused imports

22. RequestPurgatoryTest: Are the changes om testRequestExpiry() needed?

23. ControllerBrokerRequestBatch.addRequestForBrokers(): The following 
statement seems to be unnecessarily creating a new HashMap on each call (except 
for the first one, which is necessary).
      brokerRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, 
Int), LeaderAndIsr])

24. KafkaController:
24.1 can startChannelManager() be part of initializeControllerContext?
24.2 In ControllerContext.allLeaders, we just need to cache the leader of each 
partition, not the ISR. This is because ISR can be changed by the leader at any 
time, which will invalidate the cache in the controller. Every time that the 
controller wants to update leaderAndISR in ZK, it always has to read the latest 
ISR from ZK anyway. Also, there is no need to call updateLeaderAndIsrCache in 
onBrokerFailure and onBrokerStartup. The leader cache is always updated when 
there is a leader change. ISR always has to be read from ZK before each update.
24.3 We need to hold a controller lock whenever we call onControllerFailover()  
since it registers listeners and therefore allows the watcher handler to update 
cached maps in controller context concurrently. Currently, 
ZookeeperLeaderElector.startup can call onControllerFailover() without the 
controller lock.
24.4 It seems that all ZK listeners like BrokerChangeListener, 
TopicChangeListener and PartitionChangeListener should be in controller, not 
inside ReplicaStateMachine and PartitionStateMachine. Both new-topic and broker 
failure watchers are defined in controller and state change machines just need 
to act on state changes.

25. ZookeeperLeaderElector:
25.1 Currently, it seems that we can call onBecomingLeader() twice, once in 
startup and another in LeaderChangeListener.handleDataChange. This may not 
cause any harm now, but it would be better if we can avoid it. One possible way 
is to call onBecomingLeader in elect if the election wins. Then we can get rid 
of the code in LeaderChangeListener.handleDataChange and the call to 
onBecomingLeader() in startup.
25.2 The name ZookeeperLeaderElector seems very generic. Should we name it to 
ControllerElector? Ditto for LeaderChangeListener?
25.3 The subscription of leaderChangeListener needs to be done in elect() after 
the ephemeral path is created successfully, not in startup. This is because we 
need to set the watcher each time the controller path is created and only the 
subscription sets the watcher.

                
> Refactor controller state machine 
> ----------------------------------
>
>                 Key: KAFKA-499
>                 URL: https://issues.apache.org/jira/browse/KAFKA-499
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Blocker
>              Labels: optimization
>         Attachments: kafka-499-v1.patch, kafka-499-v2.patch
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> Currently, the controller logic is very procedural and is similar to 
> KafkaZookeeper. Controller should have a well defined state machine with 
> states and transitions. This will make it easier to understand and maintain 
> the controller code. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to