[ 
https://issues.apache.org/jira/browse/KAFKA-343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13404797#comment-13404797
 ] 

Jun Rao commented on KAFKA-343:
-------------------------------

Thanks for the patch. Overall, the patch looks promising. Some comments:

1. KafkaController:
1.1 Is the following global import needed since it's imported in a nested class 
already?
   import collection.JavaConversions._
1.2 ControllerChannelManager: brokers in allBrokers are added twice in the 
constructor.
1.3 BrokerChangeListener: 
  If you want to do sth for each item in a collection but don't need an output 
collection, use collection.foreach instead of collection.map. For example, the 
following usage of map should be changed to foreach.
        addedBrokersSeq.map(controllerChannelManager.addBroker(_))
        deletedBrokerIds.map(controllerChannelManager.removeBroker(_))
1.4 onBrokerChange():
1.4.1 We should use cached eplica assignment allTopicPartitionAssignment in 
controller, instead of re-reading from ZK on every broker failure.
1.4.2 Instead of sending a leaderAndISRRequest per topicPartition, we should 
batch them and send only 1 leaderAndISRRequest per broker for each 
onBrokerChange() call.
1.4.3 If no assigned replica is in liveBrokerIds, we should log an error for 
the affected topic/partition.
1.4.4 Instead of using curZkPathVersion + 1 as the new version of a ZK path, we 
should use the new ZK version returned in conditionalUpdate. There is no 
guarantee that the new version is always curZkPathVersion + 1.
1.4.5 We should add an info level log for the final leaderAndISRRequest that we 
send to each broker.
1.4.6 If we select a new leader that's not in ISR, we should log a warning and 
indicate that it can cause potential data loss.
1.4.7 ZkUtils.getLeaderAndISRForPartition may return none. We need to handle it.
1.5 TopicChangeListener.handleChildChange(): We only need to update 
allTopicPartitionAssignment for the new and deleted topics. 
1.6 handleNewTopics(): We should use allTopicPartitionAssignment, instead of 
reading from ZK.
1.7 allTopicPartitionAssignment: the replicas should be a list, instead of a 
set. This is because the ordering of the replicas is important and the 1st 
replica is the preferred one. We won't make use of it in this jira, but will be 
in the future.
1.8 initLeaders():
1.8.1 We should log a warning if no assigned replica is live.
1.8.2 Similar to onBrokerChange (), we should try to batch LeaderAndISRRequest 
to each broker.
1.8.3 We should set noInit in LeaderAndISRRequest. IsInit will only be used in 
leaderAndISRRecovery().
1.9 in all ZK event handlers, let's add try/catch for all throwables and log 
them.
1.10 leaderAndISRRecovery(): we should try to batch LeaderAndISRRequest to each 
broker.
1.11 tryToBecomeController(): I would add explicitly return keyword for each of 
the return statements to make things clear since they are not the last one in 
the function.
1.12 handleDeletedTopics():
1.12.1 delete partitions are already removed from allTopicPartitionAssignment. 
So we need to pass the assignment in.
1.12.2 we should try to batch stopReplicaRequest to each broker.
 
2. KafkaApis.handleLeaderAndISRRequest(): don't need to check 
if(leaderAndISR.ISR.contains(brokerId)). This allows the controller to 
bootstrap a new replica (from changing #partitions or partition assignments).

3. LeaderAndISRRequest: It's better to change isInit to boolean and send the 
right byte under the cover.

4. KafkaZookeeper.initLocalReplicas(): should remove local replicas no longer 
assigned to this broker. Also, this function needs to be called before 
registerBrokerInZk(). Otherwise, a deleted topic can be recreated and assigned 
to this broker before local replica is cleaned up first.

5. AdminUtils: Remove the following line:
  scala.Seq

6. KafkaRequestHandler: Doesn't seem to require any change.

7. LeaderAndISRRequest: It seems that CurrentVersion is more appropriate than 
InitialLeaderAndISRRequestVersion since the version could evolve in the future.

8. Partition.updateISR(): This function is used in 3 places: 
ReplicaManager.makeLeader, maybeShrinkISR and recordFollowerPosition. In the 
first case, we only need to update the ISR in cache since we know the 
controller already updated the ISR in ZK. However, the latter 2 cases are 
initiated by the leader. So we need to update ISR in both cache and ZK. So, we 
will need 2 versions of updateISR(), one just updating the cache, another 
updating both cache and ZK.

9. Replica.isLeader(): need to handle the case when leader doesn't exist

10. ReplicaManager:
10.1 The indentation for some logging statements cross 2 lines is wrong.
10.2 stopReplica: The deletion of a log should probably be done in LogManager 
with a new function deleteLog.

11. StopReplicaRequest: It seems that CurrentVersion is more appropriate than 
InitialVersion since the version could evolve in the future.

12. ControllerBasicTest: Why is the test commented out?

13. LeaderElectionTest: why is testEpoch() removed?

14. log4j.properties: Is the change just for debugging?

                
> revisit the become leader and become follower state change operations using 
> V3 design
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-343
>                 URL: https://issues.apache.org/jira/browse/KAFKA-343
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: kafka_343.patch
>
>
> We need to reimplement become leader/follower using the controller model 
> described in 
> https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to