[ 
https://issues.apache.org/jira/browse/KAFKA-343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Ye updated KAFKA-343:
--------------------------

    Attachment: kafka_343.diff.4

Answering Neha's reviews

1. ZkUtils
1. Remove the debug statement in getEpochForPartition that says “get data, see 
m”. Debug statements should be carefully added and should be added to be useful 
during debugging.
2. Another debug statement that can be improved -
3. “Check the leaderEpocAndISR raw string:”. Can you please change this to 
something like trace(“The leader and isr info %s read from zk path %s”)
4. Change debug(“the leader....”) to debug(“Leader %d, Epoch %d, isr %s, zk 
path version %d for topic %s and partition %d”). Without knowing the topic and 
partition, this debug statement will not be useful
5. Overall, change any log statement that says “get this and check that”. For 
example, getLeaderForPartition()


above comments about the logging statement fixed 



6. Change epoc to epoch everywhere in your code

fixed

7. In getPartitionLeaderAndISRForTopics, the input is a list of topics for 
which leader and isr is to be fetched, but in the output, some partitions for 
which leader and isr is not available are missing from the return value. This 
is hard to understand from the caller's POV. Either it should return None or 
throw an exception so that the caller can handle Option value or an exception 
to know that some partitions don't have leader and isr info.

This is reasonable, because this function is only called at 
getLeaderAndISRFromZookeeper() in controller, as recovering the leader and ISR 
info from zookeeper (if there is). It's possible that some partitions don't 
have the leader and ISR in zookeeper, which does not matter. (will be 
initialezed later)



8. What is the value of change readData to readDataMaybeNull in 
getBrokerInfoFromIds ? Basically you are returning invalid Broker objects with 
null broker info from that API, which doesn't seem useful to the caller. The 
fix is same as above. Either throw exception (like before) or change it to 
return Option.

Inside createBroker(), I handle the case where the brokerInfo is null, a 
BrokerNotExistException (this exception is added by me) is thrown, add caught 
at makeFollower() function of ReplicaManager, when a broker intends to follow a 
leader which was down, this excpetion is thrown and caught, and corresponding 
error code is returned. 



2. KafkaController.scala
1. Rename recoverLeaderAndISRFromZookeeper to getLeaderAndISRFromZookeeper

OK


2. In recoverLeaderAndISRFromZookeeper () API, get is blindly invoked on an 
Option variable returned from allPartitionReplicaAssignment. Scala options 
force us to handle invalid/missing values cleanly, please use Option correctly 
everywhere in your code.

Actually it was implicitely handled (with a waste of function call, before it 
was like:

        val brokersAssignedToThisPartitionOpt = 
allPartitionReplicaAssignment.get(topicPartition)
        if(brokersAssignedToThisPartitionOpt == None){
          warn("during leaderAndISR recovery, there's no replica assignment for 
partition [%s, %d] with allPartitionReplicaAssignment: 
%s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
        } else{
          val relatedBrokersAssignedToThisPartition = 
allPartitionReplicaAssignment.get(topicPartition).get.filter(brokerIds.contains(_))
          ....
        }

Now it is:
        val brokersAssignedToThisPartitionOpt = 
allPartitionReplicaAssignment.get(topicPartition)
        if(brokersAssignedToThisPartitionOpt == None){
          warn("during leaderAndISR recovery, there's no replica assignment for 
partition [%s, %d] with allPartitionReplicaAssignment: 
%s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
        } else{
          val relatedBrokersAssignedToThisPartition = 
brokersAssignedToThisPartitionOpt.filter(brokerIds.contains(_))
          ....
        }




3. The following info statement is unclear - “On leaderAndISR recover, ....”. 
Please change it to “After reading leader and ISR from zookeeper, ...”

I've changed into better class level log indention



4. What is the purpose of leaderAndISRRecovery since it just calls 
recoverLeaderAndISRFromZookeeper anyways ? Let's remove this API

OK


5. All info statements in controllerRegisterOrFailover() API are unclear. 
Please follow the info statements in leaderElection() in KafkaZookeeper(the 
code you deleted) and fix these.

With class level log indention, it should be better


3. Partition.scala
1. Change updateISR back to take in an optional zkclient instead of null and a 
separate boolean value. That way it makes it easier to handle a missing zk 
client instead of running into NPE

Done


2. Parentheses should be on the same line in updateISR. Try to follow 
consistent coding convention.


I use "LinkedIn style" auto indention of IntelliJ, It will automatically indent 
the follow code snippet to:

      inSyncReplicas = newISR.map {r =>
        getReplica(r) match {
          case Some(replica) => replica
          case None => throw new KafkaException("ISR update failed. No replica 
for id %d".format(r))
        }
                                  }

While I prefer:
      
      inSyncReplicas = newISR.map
      {
        r =>
          getReplica(r) match {
            case Some(replica) => replica
            case None => throw new KafkaException("ISR update failed. No 
replica for id %d".format(r))
          }
      }      



3. Why is the try-catch-finally clause removed while acquiring/releasing the 
leaderAndISRLock ? This is very dangerous and can lead to the lock never being 
released in some failure cases.

Now they're back

4. Keep the info/error statements like those in updateISRInZk (code that you 
deleted)

Done




4. ReplicaManager.scala
1. The deleteLog change seems pretty hacky. What happens if the log was being 
written to while processing the stop replica request ? It doesn't seem like 
that this scenario is handled and tested in this patch. Please revert all log 
deletion related changes, leave a TODO comment in stopReplica. We have another 
JIRA filed for delete topic, maybe we can handle it cleanly as part of that ? 
This will also help reduce the scope of this patch which is supposed to only 
handle become leader and become follower.

I think that's a good idea



2. Change all log4j statements that say “On broker, blah” to “Blah on broker”
Now I'm using class level log indent



3. Does makeLeader and makeFollower ever return an error code other than 
NoError ?
Now it does for makeFollower, I add "TODO" for makeLeader()



5. Log.scala
1. Revert
May be not applicable now


6. LogManager.scala
1. Revert
May be not applicable now


7. FileMessageSet.scala
1. Revert

Reverted


8. SimpleConsumer
1. Revert

Reverted


9. AbstractFetcherManager

May be not applicable now


10. KafkaServer.scala
1. Remove commented out code in addReplica

done



11. KafkaController.scala
1. Change all log4j statements of this format “Controller %d see blah” to 
something meaningful like “Blah is %s on controller %d”

Changed



12. Testing

12.1. In TestUtils,
1. change variables ISR1 and ISR2 to lower case. Also make the same change in 
ZkUtils and anywhere else where you've used all-uppercase to denote a variable. 
Also, rename createSampleLeaderAndISRResponse to createTestLeaderAndISRResponse

Changed


2. waitUntilLeaderIsElected should be improved with this patch to handle new 
leader election as well as existing leader change.

Now improved with a new function waitUntilLeaderIsElectedOrChanged()



12.2. In ControllerBasicTest, change assertEquals to say “Controller should be 
on broker 1”. You might want to remove the debug statement that says “command 
send test finishes”

Fixed



12.3. Why are all tests deleted from ControllerToBrokerRequestTest.scala ?
This test is now included in ControllerBasicTest() and not valuable at all



12.4. LeaderElectionTest and ControllerBasicTest throws couple of new errors 
with this patch - 
Now they're either identified as not actual error or fixed.



More descrption:

This patch basically addresses various kinds of exceptions during handling 
leader and ISR requests. Now the logic of the controller side is pretty clear. 


Also I changed the watiUnitLeaderIsElected() to 
watiUnitLeaderIsElectedOrChanged()

I added class level log indention for KafkaController, KafkaServer, LogManager, 
Log, ReplicaManager, ReplicaFetcherThread, etc.. now the output of the log 
should be easier to read.

I made quite a few changes to the ZkUtils class, most importantly, the 
partition is always treated as integer, unlike before it's String in a lot of 
cases. 

Unit tests all passed. But I find with low probability, broker failure system 
test will fail with losing a few messages, also I find latest code in 0.8 also 
fails at broker failure system test by losing messages. We need to look in 
detail of this problem. But the general controller side logic in this patch is 
clear. 




                
> revisit the become leader and become follower state change operations using 
> V3 design
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-343
>                 URL: https://issues.apache.org/jira/browse/KAFKA-343
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: kafka_343.diff.2, kafka_343.diff.3, kafka_343.diff.4, 
> kafka_343.patch
>
>
> We need to reimplement become leader/follower using the controller model 
> described in 
> https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira


Reply via email to