[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13965458#comment-13965458
 ] 

Guozhang Wang commented on KAFKA-1382:
--------------------------------------

Yeah I meant to say "broker" in the last comment.

Why in this corner case we still want to fail the update instead of just 
letting it pass?

> Update zkVersion on partition state update failures
> ---------------------------------------------------
>
>                 Key: KAFKA-1382
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1382
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joel Koshy
>
> Our updateIsr code is currently:
>   private def updateIsr(newIsr: Set[Replica]) {
>     debug("Updated ISR for partition [%s,%d] to %s".format(topic, 
> partitionId, newIsr.mkString(",")))
>     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
> newIsr.map(r => r.brokerId).toList, zkVersion)
>     // use the epoch of the controller that made the leadership decision, 
> instead of the current controller epoch
>     val (updateSucceeded, newVersion) = 
> ZkUtils.conditionalUpdatePersistentPath(zkClient,
>       ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
>       ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
>     if (updateSucceeded){
>       inSyncReplicas = newIsr
>       zkVersion = newVersion
>       trace("ISR updated to [%s] and zkVersion updated to 
> [%d]".format(newIsr.mkString(","), zkVersion))
>     } else {
>       info("Cached zkVersion [%d] not equal to that in zookeeper, skip 
> updating ISR".format(zkVersion))
>     }
> We encountered an interesting scenario recently when a large producer fully
> saturated the broker's NIC for over an hour. The large volume of data led to
> a number of ISR shrinks (and subsequent expands). The NIC saturation
> affected the zookeeper client heartbeats and led to a session timeout. The
> timeline was roughly as follows:
> - Attempt to expand ISR
> - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
> - Session timeout after around 13 seconds (the configured timeout is 20
>   seconds) so that lines up.
> - zkclient reconnects to zookeeper (with the same session ID) and retries
>   the write - but uses the old zkVersion. This fails because the zkVersion
>   has already been updated (above).
> - The ISR expand keeps failing after that and the only way to get out of it
>   is to bounce the broker.
> In the above code, if the zkVersion is different we should probably update
> the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to