[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15315055#comment-15315055
 ] 

Maysam Yabandeh edited comment on KAFKA-3693 at 6/10/16 3:54 AM:
-----------------------------------------------------------------

*update:* the start and end of broker 16's shutdown that was put here was not 
correct, which had caused a lot of confusion; corrected in this update to avoid 
confusion for future leaders

Thanks [~junrao]. My understanding is that the LeaderAndIsr message with 
partial list has been failed to be delivered to broker 16 and it was finally 
delivered when broker 16 was back online again.

Let me paste more of relevant logs perhaps it would make the buggy scenario 
more clear.

At 06:17:02,012 the controller attempts to send the LeaderAndIsr message to 
broker 16

{code}
2016-05-10 06:17:02,012 TRACE change.logger: Controller 17 epoch 269 sending 
become-leader LeaderAndIsr request 
(Leader:16,ISR:16,LeaderEpoch:79,ControllerEpoch:269) to broker 16 for 
partition [topic.xyz,134]
{code}

This attempt however fails 
{code}
2016-05-10 06:17:02,076 WARN controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17's connection to broker 
Node(16, node16.com, 9876) was unsuccessful
{code}
since broker 16 has been shutting down since 06:15:16,582 until 06:17:27,160
{code}
2016-05-10 06:15:16,582 INFO server.KafkaServer: [Kafka Server 16], shutting 
down
2016-05-10 06:15:16,583 INFO server.KafkaServer: [Kafka Server 16], Starting 
controlled shutdown
2016-05-10 06:17:27,160 INFO server.KafkaServer: [Kafka Server 16], shut down 
completed
{code}

The controller repeats the attempt until it finally connects to broker 16 at 
06:17:33,344
{code}2016-05-10 06:17:33,344 INFO controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17 connected to Node(16, 
node16.com, 9876) for sending state change requests
{code}
when the broker 16 is restarted. 

Here is the first 5 state changes the broker 16 performs right after the 
restart:
{code}
2016-05-10 06:17:33,410 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:21,ISR:21,LeaderEpoch:58,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:20,21)
 for partition [topic.qwe,3] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2398
2016-05-10 06:17:33,438 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:14,ISR:14,LeaderEpoch:110,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:20,14)
 for partition [topic.asd,88] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2399
2016-05-10 06:17:33,440 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:15,ISR:15,LeaderEpoch:18,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:20,15)
 for partition [topic.zxc,8] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2400
2016-05-10 06:17:33,442 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:21,ISR:21,LeaderEpoch:61,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:21,20)
 for partition [topic.iop,4] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2401
2016-05-10 06:17:33,447 TRACE change.logger: Broker 16 received LeaderAndIsr 
request 
(LeaderAndIsrInfo:(Leader:16,ISR:16,LeaderEpoch:79,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:16,20)
 correlation id 2402 from controller 17 epoch 269 for partition [topic.xyz,134]
{code}
The last of which is the LeaderAndIsr with incomplete list of partitions that 
we were talking about. I do not see any other track in the controller log 
indicating of any similar message sent to broker 16, which tells me the 
LeaderAndIsr message received at 06:17:33,447 must be the same one that was 
formed at 06:17:02,012 and has been attempted later when broker 16 was back 
online.

Does the above make sense?


was (Author: maysamyabandeh):
Thanks [~junrao]. My understanding is that the LeaderAndIsr message with 
partial list has been failed to be delivered to broker 16 and it was finally 
delivered when broker 16 was back online again.

Let me paste more of relevant logs perhaps it would make the buggy scenario 
more clear.

At 06:17:02,012 the controller attempts to send the LeaderAndIsr message to 
broker 16

{code}
2016-05-10 06:17:02,012 TRACE change.logger: Controller 17 epoch 269 sending 
become-leader LeaderAndIsr request 
(Leader:16,ISR:16,LeaderEpoch:79,ControllerEpoch:269) to broker 16 for 
partition [topic.xyz,134]
{code}

This attempt however fails 
{code}
2016-05-10 06:17:02,076 WARN controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17's connection to broker 
Node(16, node16.com, 9876) was unsuccessful
{code}
since broker 16 has been shutting down since 05:40:46,845 until 06:17:01,701
{code}
2016-05-10 05:40:46,845 INFO server.ReplicaFetcherThread: 
[ReplicaFetcherThread-0-15], Shutting down
...
2016-05-10 06:17:01,701 
{code}

The controller repeats the attempt until it finally connects to broker 16 at 
06:17:33,344
{code}2016-05-10 06:17:33,344 INFO controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17 connected to Node(16, 
node16.com, 9876) for sending state change requests
{code}
when the broker 16 is restarted. 

Here is the first 5 state changes the broker 16 performs right after the 
restart:
{code}
2016-05-10 06:17:33,410 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:21,ISR:21,LeaderEpoch:58,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:20,21)
 for partition [topic.qwe,3] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2398
2016-05-10 06:17:33,438 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:14,ISR:14,LeaderEpoch:110,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:20,14)
 for partition [topic.asd,88] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2399
2016-05-10 06:17:33,440 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:15,ISR:15,LeaderEpoch:18,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:20,15)
 for partition [topic.zxc,8] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2400
2016-05-10 06:17:33,442 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:21,ISR:21,LeaderEpoch:61,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:21,20)
 for partition [topic.iop,4] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2401
2016-05-10 06:17:33,447 TRACE change.logger: Broker 16 received LeaderAndIsr 
request 
(LeaderAndIsrInfo:(Leader:16,ISR:16,LeaderEpoch:79,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:16,20)
 correlation id 2402 from controller 17 epoch 269 for partition [topic.xyz,134]
{code}
The last of which is the LeaderAndIsr with incomplete list of partitions that 
we were talking about. I do not see any other track in the controller log 
indicating of any similar message sent to broker 16, which tells me the 
LeaderAndIsr message received at 06:17:33,447 must be the same one that was 
formed at 06:17:02,012 and has been attempted later when broker 16 was back 
online.

Does the above make sense?

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3693
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3693
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
>     var partition = allPartitions.get((topic, partitionId))
>     if (partition == null) {
>       allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
>     val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>           val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>           val offsetMap = checkpoint.read
>           if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
>             info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to