[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323801#comment-15323801
 ] 

Maysam Yabandeh commented on KAFKA-3693:
----------------------------------------

[~junrao] Yes, you seem to be right.
The controller missed the death of broker 16:
{code}
$ grep "Broker change listener fired" controller.log.2016-05-10.2 
2016-05-10 05:58:30,616 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Broker change listener fired for path /brokers/ids with 
children 21,20,19,17,22,18,15,16,13,14
2016-05-10 06:17:37,577 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Broker change listener fired for path /brokers/ids with 
children 20,17,15,16
2016-05-10 06:29:48,595 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Broker change listener fired for path /brokers/ids with 
children 21,20,19,22,17,18,15,16,13,14
{code}
And it was because broker 16 rejoined zk at 06:17:33,432 before the controller 
gets notified at 06:17:37,577 and hence does not notice that the ephemeral node 
of broker 16 was deleted at some point:
{code}
2016-05-10 05:33:57,907 INFO utils.ZkUtils: Registered broker 16 at path 
/brokers/ids/16 with addresses: PLAINTEXT -> EndPoint(node16.com,9092,PLAINTEXT)
2016-05-10 06:15:16,582 INFO server.KafkaServer: [Kafka Server 16], shutting 
down
2016-05-10 06:17:27,160 INFO server.KafkaServer: [Kafka Server 16], shut down 
completed
2016-05-10 06:17:30,916 INFO server.KafkaServer: starting
2016-05-10 06:17:33,432 INFO utils.ZkUtils: Registered broker 16 at path 
/brokers/ids/16 with addresses: PLAINTEXT -> EndPoint(node16.com,9092,PLAINTEXT)
{code}

So I guess the mystery of why the controller sends incomplete LeaderAndIsr 
requests to brokers is solved. If I may, i would like to emphasis again that 
making the broker code defensive against such cases would offer a stable fix to 
this problem since we do not know whether there are other existing cases that 
would result to similar incomplete LeaderAndIsr messages or perhaps there will 
be new cases caused by future patches. Simple solutions such as having the 
broker counting the number of partitions in the HW checkpoint file and 
rejecting the controller's message if it contains less partitions should still 
be compatible with security-enabled cases that you previously mentioned.

Regarding the shutdown process, i think i was mistaken; the shutdown process 
took only 2m:11s
{code}
2016-05-10 06:15:16,582 INFO server.KafkaServer: [Kafka Server 16], shutting 
down
2016-05-10 06:15:16,583 INFO server.KafkaServer: [Kafka Server 16], Starting 
controlled shutdown
2016-05-10 06:17:27,160 INFO server.KafkaServer: [Kafka Server 16], shut down 
completed
{code}
I misread the fetcher thread shut down as the start of the shut down process:
{code}
2016-05-10 05:40:46,845 INFO server.ReplicaFetcherThread: 
[ReplicaFetcherThread-0-15], Shutting down
{code}
Sorry for the confusion.

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3693
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3693
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
>     var partition = allPartitions.get((topic, partitionId))
>     if (partition == null) {
>       allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
>     val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>           val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>           val offsetMap = checkpoint.read
>           if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
>             info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to