[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13965567#comment-13965567
 ] 

Jun Rao commented on KAFKA-1382:
--------------------------------

Basically once a controller has updated isr, the leader can't touch it until it 
has received the latest info from the controller. Say a leader had a zk version 
conflict when updating isr and it sees the latest value (written by the 
controller) is the same as the value it's trying the write, and we allow it. 
What could happen is that the leader could successfully shrink the isr again 
before the controller's decision is propagated to all brokers. Once the leader 
shrinks the isr, it can commit new messages, which may not be in the new leader 
that the controller selected. We have lost committed data at this point.

> Update zkVersion on partition state update failures
> ---------------------------------------------------
>
>                 Key: KAFKA-1382
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1382
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joel Koshy
>
> Our updateIsr code is currently:
>   private def updateIsr(newIsr: Set[Replica]) {
>     debug("Updated ISR for partition [%s,%d] to %s".format(topic, 
> partitionId, newIsr.mkString(",")))
>     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
> newIsr.map(r => r.brokerId).toList, zkVersion)
>     // use the epoch of the controller that made the leadership decision, 
> instead of the current controller epoch
>     val (updateSucceeded, newVersion) = 
> ZkUtils.conditionalUpdatePersistentPath(zkClient,
>       ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
>       ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
>     if (updateSucceeded){
>       inSyncReplicas = newIsr
>       zkVersion = newVersion
>       trace("ISR updated to [%s] and zkVersion updated to 
> [%d]".format(newIsr.mkString(","), zkVersion))
>     } else {
>       info("Cached zkVersion [%d] not equal to that in zookeeper, skip 
> updating ISR".format(zkVersion))
>     }
> We encountered an interesting scenario recently when a large producer fully
> saturated the broker's NIC for over an hour. The large volume of data led to
> a number of ISR shrinks (and subsequent expands). The NIC saturation
> affected the zookeeper client heartbeats and led to a session timeout. The
> timeline was roughly as follows:
> - Attempt to expand ISR
> - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
> - Session timeout after around 13 seconds (the configured timeout is 20
>   seconds) so that lines up.
> - zkclient reconnects to zookeeper (with the same session ID) and retries
>   the write - but uses the old zkVersion. This fails because the zkVersion
>   has already been updated (above).
> - The ISR expand keeps failing after that and the only way to get out of it
>   is to bounce the broker.
> In the above code, if the zkVersion is different we should probably update
> the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to