[ https://issues.apache.org/jira/browse/KAFKA-343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yang Ye updated KAFKA-343: -------------------------- Attachment: kafka_343.diff.4 Answering Neha's reviews 1. ZkUtils 1. Remove the debug statement in getEpochForPartition that says “get data, see m”. Debug statements should be carefully added and should be added to be useful during debugging. 2. Another debug statement that can be improved - 3. “Check the leaderEpocAndISR raw string:”. Can you please change this to something like trace(“The leader and isr info %s read from zk path %s”) 4. Change debug(“the leader....”) to debug(“Leader %d, Epoch %d, isr %s, zk path version %d for topic %s and partition %d”). Without knowing the topic and partition, this debug statement will not be useful 5. Overall, change any log statement that says “get this and check that”. For example, getLeaderForPartition() above comments about the logging statement fixed 6. Change epoc to epoch everywhere in your code fixed 7. In getPartitionLeaderAndISRForTopics, the input is a list of topics for which leader and isr is to be fetched, but in the output, some partitions for which leader and isr is not available are missing from the return value. This is hard to understand from the caller's POV. Either it should return None or throw an exception so that the caller can handle Option value or an exception to know that some partitions don't have leader and isr info. This is reasonable, because this function is only called at getLeaderAndISRFromZookeeper() in controller, as recovering the leader and ISR info from zookeeper (if there is). It's possible that some partitions don't have the leader and ISR in zookeeper, which does not matter. (will be initialezed later) 8. What is the value of change readData to readDataMaybeNull in getBrokerInfoFromIds ? Basically you are returning invalid Broker objects with null broker info from that API, which doesn't seem useful to the caller. The fix is same as above. Either throw exception (like before) or change it to return Option. Inside createBroker(), I handle the case where the brokerInfo is null, a BrokerNotExistException (this exception is added by me) is thrown, add caught at makeFollower() function of ReplicaManager, when a broker intends to follow a leader which was down, this excpetion is thrown and caught, and corresponding error code is returned. 2. KafkaController.scala 1. Rename recoverLeaderAndISRFromZookeeper to getLeaderAndISRFromZookeeper OK 2. In recoverLeaderAndISRFromZookeeper () API, get is blindly invoked on an Option variable returned from allPartitionReplicaAssignment. Scala options force us to handle invalid/missing values cleanly, please use Option correctly everywhere in your code. Actually it was implicitely handled (with a waste of function call, before it was like: val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition) if(brokersAssignedToThisPartitionOpt == None){ warn("during leaderAndISR recovery, there's no replica assignment for partition [%s, %d] with allPartitionReplicaAssignment: %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment)) } else{ val relatedBrokersAssignedToThisPartition = allPartitionReplicaAssignment.get(topicPartition).get.filter(brokerIds.contains(_)) .... } Now it is: val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition) if(brokersAssignedToThisPartitionOpt == None){ warn("during leaderAndISR recovery, there's no replica assignment for partition [%s, %d] with allPartitionReplicaAssignment: %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment)) } else{ val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.filter(brokerIds.contains(_)) .... } 3. The following info statement is unclear - “On leaderAndISR recover, ....”. Please change it to “After reading leader and ISR from zookeeper, ...” I've changed into better class level log indention 4. What is the purpose of leaderAndISRRecovery since it just calls recoverLeaderAndISRFromZookeeper anyways ? Let's remove this API OK 5. All info statements in controllerRegisterOrFailover() API are unclear. Please follow the info statements in leaderElection() in KafkaZookeeper(the code you deleted) and fix these. With class level log indention, it should be better 3. Partition.scala 1. Change updateISR back to take in an optional zkclient instead of null and a separate boolean value. That way it makes it easier to handle a missing zk client instead of running into NPE Done 2. Parentheses should be on the same line in updateISR. Try to follow consistent coding convention. I use "LinkedIn style" auto indention of IntelliJ, It will automatically indent the follow code snippet to: inSyncReplicas = newISR.map {r => getReplica(r) match { case Some(replica) => replica case None => throw new KafkaException("ISR update failed. No replica for id %d".format(r)) } } While I prefer: inSyncReplicas = newISR.map { r => getReplica(r) match { case Some(replica) => replica case None => throw new KafkaException("ISR update failed. No replica for id %d".format(r)) } } 3. Why is the try-catch-finally clause removed while acquiring/releasing the leaderAndISRLock ? This is very dangerous and can lead to the lock never being released in some failure cases. Now they're back 4. Keep the info/error statements like those in updateISRInZk (code that you deleted) Done 4. ReplicaManager.scala 1. The deleteLog change seems pretty hacky. What happens if the log was being written to while processing the stop replica request ? It doesn't seem like that this scenario is handled and tested in this patch. Please revert all log deletion related changes, leave a TODO comment in stopReplica. We have another JIRA filed for delete topic, maybe we can handle it cleanly as part of that ? This will also help reduce the scope of this patch which is supposed to only handle become leader and become follower. I think that's a good idea 2. Change all log4j statements that say “On broker, blah” to “Blah on broker” Now I'm using class level log indent 3. Does makeLeader and makeFollower ever return an error code other than NoError ? Now it does for makeFollower, I add "TODO" for makeLeader() 5. Log.scala 1. Revert May be not applicable now 6. LogManager.scala 1. Revert May be not applicable now 7. FileMessageSet.scala 1. Revert Reverted 8. SimpleConsumer 1. Revert Reverted 9. AbstractFetcherManager May be not applicable now 10. KafkaServer.scala 1. Remove commented out code in addReplica done 11. KafkaController.scala 1. Change all log4j statements of this format “Controller %d see blah” to something meaningful like “Blah is %s on controller %d” Changed 12. Testing 12.1. In TestUtils, 1. change variables ISR1 and ISR2 to lower case. Also make the same change in ZkUtils and anywhere else where you've used all-uppercase to denote a variable. Also, rename createSampleLeaderAndISRResponse to createTestLeaderAndISRResponse Changed 2. waitUntilLeaderIsElected should be improved with this patch to handle new leader election as well as existing leader change. Now improved with a new function waitUntilLeaderIsElectedOrChanged() 12.2. In ControllerBasicTest, change assertEquals to say “Controller should be on broker 1”. You might want to remove the debug statement that says “command send test finishes” Fixed 12.3. Why are all tests deleted from ControllerToBrokerRequestTest.scala ? This test is now included in ControllerBasicTest() and not valuable at all 12.4. LeaderElectionTest and ControllerBasicTest throws couple of new errors with this patch - Now they're either identified as not actual error or fixed. More descrption: This patch basically addresses various kinds of exceptions during handling leader and ISR requests. Now the logic of the controller side is pretty clear. Also I changed the watiUnitLeaderIsElected() to watiUnitLeaderIsElectedOrChanged() I added class level log indention for KafkaController, KafkaServer, LogManager, Log, ReplicaManager, ReplicaFetcherThread, etc.. now the output of the log should be easier to read. I made quite a few changes to the ZkUtils class, most importantly, the partition is always treated as integer, unlike before it's String in a lot of cases. Unit tests all passed. But I find with low probability, broker failure system test will fail with losing a few messages, also I find latest code in 0.8 also fails at broker failure system test by losing messages. We need to look in detail of this problem. But the general controller side logic in this patch is clear. > revisit the become leader and become follower state change operations using > V3 design > ------------------------------------------------------------------------------------- > > Key: KAFKA-343 > URL: https://issues.apache.org/jira/browse/KAFKA-343 > Project: Kafka > Issue Type: Sub-task > Components: core > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Yang Ye > Fix For: 0.8 > > Attachments: kafka_343.diff.2, kafka_343.diff.3, kafka_343.diff.4, > kafka_343.patch > > > We need to reimplement become leader/follower using the controller model > described in > https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira