[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332618#comment-15332618
 ] 

Maysam Yabandeh commented on KAFKA-3693:
----------------------------------------

Thinking more about this, the complexity seems to stem from the controller not 
explicitly distinguishing between the first LeaderAndIsrRequest, which has a 
very special meaning, from the others. Looking back at the 
[design|https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3],
 there was a *isInit* field envisioned in the LeaderAndIsrRequest to serve this 
purpose:
{code}
LeaderAndISRRequest {
  request_type_id         : int16 // the request id
  version_id              : int16 // the version of this request
  client_id               : int32 // this can be the broker id of the controller
  ack_timeout             : int32 // the time in ms to wait for a response
  isInit                  : byte  // whether this is the first command issued 
by a controller
  leaderAndISRMap         : Map[(topic: String, partitionId: int32) => 
LeaderAndISR) // a map of LeaderAndISR
}
{code} however i do not seem to be able to track this field in the 
[trunk|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java]:
{code}
    public static final Schema LEADER_AND_ISR_REQUEST_V0 = new Schema(new 
Field("controller_id", INT32, "The controller id."),
                                                                      new 
Field("controller_epoch", INT32, "The controller epoch."),
                                                                      new 
Field("partition_states",
                                                                                
new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)),
                                                                      new 
Field("live_leaders", new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));
    public static final Schema[] LEADER_AND_ISR_REQUEST = new Schema[] 
{LEADER_AND_ISR_REQUEST_V0};
{code}

Not sure why the *isInit* field did not make it from the design to the 
implementation but it seems that it would be pretty straightforward to make the 
brokers defensive against such corner cases if this field is available, i.e., 
reject the first LeaderAndIsrRequest message if it does not have isInit set.

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3693
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3693
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
>     var partition = allPartitions.get((topic, partitionId))
>     if (partition == null) {
>       allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
>     val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>           val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>           val offsetMap = checkpoint.read
>           if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
>             info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to