[
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089248#comment-15089248
]
dude commented on KAFKA-1382:
-----------------------------
we hit this bug in kafka0.8.2.1, three nodes. zookeeper version is 3.4.6. the
log is :
[2016-01-05 08:49:27,047] INFO Partition
[error-signatureId-956a8fd7-a3ec-4718-bb77-45b3a97eb0cd,0] on broker 3:
Shrinking ISR for partition [error-signatureId-956a8fd
7-a3ec-4718-bb77-45b3a97eb0cd,0] from 3,1 to 3 (kafka.cluster.Partition)
[2016-01-05 08:49:27,227] ERROR Uncaught exception in thread
'kafka-network-thread-39091-0': (kafka.utils.Utils$)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:524)
at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
at kafka.network.Processor.close(SocketServer.scala:374)
at kafka.network.Processor.processNewResponses(SocketServer.scala:406)
at kafka.network.Processor.run(SocketServer.scala:318)
at java.lang.Thread.run(Thread.java:745)
[2016-01-05 08:49:27,248] INFO Reconnect due to socket error:
java.io.IOException: connection timeout (kafka.consumer.SimpleConsumer)
[2016-01-05 08:49:27,248] INFO Reconnect due to socket error:
java.io.IOException: Connection reset by peer (kafka.consumer.SimpleConsumer)
[2016-01-05 08:49:27,278] ERROR Uncaught exception in thread
'kafka-network-thread-39091-1': (kafka.utils.Utils$)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:524)
at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
at kafka.network.Processor.close(SocketServer.scala:374)
at kafka.network.Processor.processNewResponses(SocketServer.scala:406)
at kafka.network.Processor.run(SocketServer.scala:318)
at java.lang.Thread.run(Thread.java:745)
[2016-01-05 08:49:27,918] INFO re-registering broker info in ZK for broker 3
(kafka.server.KafkaHealthcheck)
[2016-01-05 08:49:36,312] INFO Registered broker 3 at path /brokers/ids/3 with
address AI-iPaaS-ATS03:39091. (kafka.utils.ZkUtils$)
[2016-01-05 08:49:36,312] INFO done re-registering broker
(kafka.server.KafkaHealthcheck)
[2016-01-05 08:49:36,313] INFO Subscribing to /brokers/topics path to watch for
new topics (kafka.server.KafkaHealthcheck)
[2016-01-05 08:49:36,332] INFO New leader is 1
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-01-05 08:49:36,343] INFO Partition
[error-signatureId-956a8fd7-a3ec-4718-bb77-45b3a97eb0cd,0] on broker 3: Cached
zkVersion [0] not equal to that in zookeeper, s
kip updating ISR (kafka.cluster.Partition)
[2016-01-05 08:49:36,343] INFO Partition
[error-signatureId-e8c1c145-4109-48d8-a46f-4eca92143594,0] on broker 3:
Shrinking ISR for partition [error-signatureId-e8c1c14
5-4109-48d8-a46f-4eca92143594,0] from 3,2 to 3 (kafka.cluster.Partition)
[2016-01-05 08:49:36,372] INFO Partition
[error-signatureId-e8c1c145-4109-48d8-a46f-4eca92143594,0] on broker 3: Cached
zkVersion [0] not equal to that in zookeeper, s
kip updating ISR (kafka.cluster.Partition)
[2016-01-05 08:49:36,373] INFO Partition
[error-signatureId-59206ee6-e9b7-470d-9b1d-638e2cc7ebad,0] on broker 3:
Shrinking ISR for partition [error-signatureId-59206ee
6-e9b7-470d-9b1d-638e2cc7ebad,0] from 3,2 to 3 (kafka.cluster.Partition)
[2016-01-05 08:49:36,402] INFO Partition
[error-signatureId-59206ee6-e9b7-470d-9b1d-638e2cc7ebad,0] on broker 3: Cached
zkVersion [0] not equal to that in zookeeper, s
kip updating ISR (kafka.cluster.Partition)
[2016-01-05 08:49:36,402] INFO Partition
[error-signatureId-be6798c3-57d8-4ddc-a155-04983987b160,0] on broker 3:
Shrinking ISR for partition [error-signatureId-be6798c
3-57d8-4ddc-a155-04983987b160,0] from 3,1 to 3 (kafka.cluster.Partition)
[2016-01-05 08:49:36,426] INFO Partition
[error-signatureId-be6798c3-57d8-4ddc-a155-04983987b160,0] on broker 3: Cached
zkVersion [0] not equal to that in zookeeper, s
kip updating ISR (kafka.cluster.Partition)
[2016-01-05 08:49:36,426] INFO Partition
[error-signatureId-38fd31e8-3a0a-4b06-b278-a8f10bab232f,0] on broker 3:
Shrinking ISR for partition [error-signatureId-38fd31e
> Update zkVersion on partition state update failures
> ---------------------------------------------------
>
> Key: KAFKA-1382
> URL: https://issues.apache.org/jira/browse/KAFKA-1382
> Project: Kafka
> Issue Type: Bug
> Reporter: Joel Koshy
> Assignee: Sriharsha Chintalapani
> Fix For: 0.8.1.2, 0.8.2.0
>
> Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch,
> KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch,
> KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch,
> KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch,
> KAFKA-1382_2014-06-16_14:19:27.patch
>
>
> Our updateIsr code is currently:
> private def updateIsr(newIsr: Set[Replica]) {
> debug("Updated ISR for partition [%s,%d] to %s".format(topic,
> partitionId, newIsr.mkString(",")))
> val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
> newIsr.map(r => r.brokerId).toList, zkVersion)
> // use the epoch of the controller that made the leadership decision,
> instead of the current controller epoch
> val (updateSucceeded, newVersion) =
> ZkUtils.conditionalUpdatePersistentPath(zkClient,
> ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
> ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
> if (updateSucceeded){
> inSyncReplicas = newIsr
> zkVersion = newVersion
> trace("ISR updated to [%s] and zkVersion updated to
> [%d]".format(newIsr.mkString(","), zkVersion))
> } else {
> info("Cached zkVersion [%d] not equal to that in zookeeper, skip
> updating ISR".format(zkVersion))
> }
> We encountered an interesting scenario recently when a large producer fully
> saturated the broker's NIC for over an hour. The large volume of data led to
> a number of ISR shrinks (and subsequent expands). The NIC saturation
> affected the zookeeper client heartbeats and led to a session timeout. The
> timeline was roughly as follows:
> - Attempt to expand ISR
> - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
> - Session timeout after around 13 seconds (the configured timeout is 20
> seconds) so that lines up.
> - zkclient reconnects to zookeeper (with the same session ID) and retries
> the write - but uses the old zkVersion. This fails because the zkVersion
> has already been updated (above).
> - The ISR expand keeps failing after that and the only way to get out of it
> is to bounce the broker.
> In the above code, if the zkVersion is different we should probably update
> the cached version and even retry the expansion until it succeeds.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)