[ 
https://issues.apache.org/jira/browse/KAFKA-498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-498:
--------------------------------

    Attachment: kafka-498-v1.patch

1. Refactoring of current procedural controller code to functional style
1.1. Using switch case statements instead of if-else
1.2. Avoid explicit return statements from try-catch blocks
1.3. Use Option instead of null. This is very important, there were several 
places that handled only the non-null case, basically making way for NPE's in 
error cases.
1.4. Rename variables in all capital letters to camel case
1.5. Fixed logging since a lot of log statements were at info and were somewhat 
unclear
1.6. Fixed indentation, some places uses 4 spaces, others used 2 spaces.
1.7. ZkUtils.scala
1.7.1 Refactored readDataMaybeNull to return an Option instead of null. This 
allows all usages of that API that query an ephemeral path to handle the case 
when the value in zookeeper does not exist anymore
1.7.2. Refactored getBrokerInfoFromIds to handle only a single broker id. The 
reason is that most usages of that API used it to query for a single broker id.
1.8 Many places in the code did not wrap the lines correctly. Fixed this as 
much as I could.

2. KafkaController.scala
2.1/ Renamed deliverLeaderAndISRFromZookeeper to readLeaderAndIsrFromZookeeper
2.2 There is a race condition in KafkaController where it doesn't synchronize 
access to the controller's data structures while creating a new session. 
Basically, controllerRegisterOrFailover is a private API that modifies almost 
all the internal controller data structures that require synchronization. Since 
this API is not synchronizing on the controller lock, all usages of this API 
need to do this correctly. Fixed handleNewSession to synchronize the 
controllerRegisterOrFailover API, since that can be concurrently executed with 
the startup procedure.
2.3 In onBrokerChange, defaulting to empty list instead of null. This lets us 
avoid null checks
2.4 Refactored onBrokerChange() API and moved the leader election logic to a 
separate API. After starting on this path, I figured that this code is going to 
need a major refactoring that I'd like to fix in a separate patch. Filed 
KAFKA-499 to cover that. For now, we can keep this although it will look 
complete only after KAFKA-499 is in.
2.5 onBrokerChange() and initLeaders() APIs are very similar and duplicate 
quite a lot of code. I refactored onBrokerChange() but realized later that 
initLeaders would have to refactored too. To keep the changes in this patch 
small enough, I will fix this as part of KAFKA-499.

3. ControllerChannelManager & KafkaController
3.1 There are 3 types of information that the controller maintains per broker - 
request thread, message queue and socket channel. Currently, they are 
maintained in 3 separate variables and we need to ensure all 3 are synchronized 
correctly. To simplify this, I created a case class to wrap this state in one 
object.
3.2 There is a lock object whose purpose is to synchronize access to the broker 
cache. Currently, the lock doesn't seem to protect all access to these caches, 
which looks like a synchronization bug. Fixed this to have startup API 
synchronize access to the broker cache.
3.3. The addBroker logic was duplicated in 2 places. Refactored the 
constructors to add an auxilary constructor to call the addBroker API that 
handles creating controller state info for a new broker and the appropriate 
synchronization. Currently, the constructor duplicates code to add a new 
broker, probably since it is not right to invoke an API from inside the primary 
constructor
3.4 There are 2 ways to remove a broker from the controller's broker cache - 
during shutdown of the channel manager or when a broker change listener fires. 
Since removeBroker is a public API, it is synchronized using the brokerLock. 
The shutdown API calls removeBroker internally and ends up acquiring and 
releasing the lock multiple times. Ideally, it is sufficient to acquire the 
brokerLock just once for the entire shutdown API. Refactored to move the common 
removeBroker logic to a private API that doesn't synchronize on the brokerLock. 
Changed removeBroker to acquire lock and call removeExistingBroker, changed 
shutdown to acquire lock once and call removeExistingBroker.

4. Renamed LeaderAndISR to LeaderAndIsr

5. Renamed BrokerNotExistException to BrokerNotAvailableException to remain 
consistent with other exceptions of the same type (LeaderNotAvailableException, 
ReplicaNotAvailableException)


NOTE: Apache svn seems to hang at the time of uploading this patch. It will 
apply cleanly on revision 1380945
                
> Controller code has race conditions and synchronization bugs
> ------------------------------------------------------------
>
>                 Key: KAFKA-498
>                 URL: https://issues.apache.org/jira/browse/KAFKA-498
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>              Labels: bugs
>         Attachments: kafka-498-v1.patch
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The controller maintains some internal data structures that are updated by 
> state changes triggered by zookeeper listeners. There are race conditions in 
> the controller channel manager and the controller state machine.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to