[ https://issues.apache.org/jira/browse/KAFKA-13483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Chen resolved KAFKA-13483. ------------------------------- Fix Version/s: 3.1.0 Resolution: Fixed > Stale Missing ISR on a partition after a zookeeper timeout > ---------------------------------------------------------- > > Key: KAFKA-13483 > URL: https://issues.apache.org/jira/browse/KAFKA-13483 > Project: Kafka > Issue Type: Bug > Components: replication > Affects Versions: 2.8.1 > Reporter: F Méthot > Priority: Major > Fix For: 3.1.0 > > > We hit a situation where we had a Stale Missing ISR on a single partition on > an output changelog topic after a "broker to zookeeper" connection timed out > in our production system, This ticket shows the logs of what happened and a > workaround that got us out of this situation. > > *Cluster config* > 7 Kafka Brokers v2.8.1 (k8s bitnami) > 3 Zookeeper v3.6.2 (k8s bitnami) > kubernetes v1.20.6 > > *Processing pattern:* > {code:java} > source topic > -> KStream application: update 40 stores backed by > -> data-##-changelog topics {code} > > All topics have {*}10 partitions{*}, {*}3 replicas{*}, *min.isr 2* > After a broker to zookeeper connection timeed out (see logs below) , lots of > topic's partitions ISR went missing. > Almost all partition recovered a few milliseconds later, as the reconnection > to zk re-established. > Except for partition number 3 of *one* of the 40 data-##-changelog topics > It stayed overnight under-replicated, preventing any progress to be done from > the source topic's partition 3 of the kstream app. At the same time halting > production of data for the 39 other changelog topic on partition 3 (because > they also reply on partition 3 of the input topic) > +*Successfull Workaround*+ > We ran kafka-reassign-partitions.sh on that partition, with the exact same > replicas config, and the ISR came back normal, in a matter of milliseconds. > {code:java} > kafka-reassign-partitions.sh --bootstrap-server > kafka.kafka.svc.cluster.local:9092 --reassignment-json-file > ./replicas-data-23-changlog.json {code} > where replicas-data-23-changlog.json contains that original ISR config > {code:java} > {"version":1,"partitions":[{"topic":"data-23-changelog","partition":3,"replicas":[1007,1005,1003]}]} > {code} > > +*Questions:*+ > Would you be able to provide an explanation why that specific partition did > not recover like the others after the zk timeout failure? > Or could it be a bug? > We are glad the workaround worked, but is there an explanation why it did? > Otherwise what should have been done to address this issue? > > +*Observed summary of the logs*+ > > {code:java} > [2021-11-20 20:21:42,577] WARN Client session timed out, have not heard from > server in 26677ms for sessionid 0x2000086f5260006 > (org.apache.zookeeper.ClientCnxn) > [2021-11-20 20:21:42,582] INFO Client session timed out, have not heard from > server in 26677ms for sessionid 0x2000086f5260006, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2021-11-20 20:21:44,644] INFO Opening socket connection to server > zookeeper.kafka.svc.cluster.local > [2021-11-20 20:21:44,646] INFO Socket connection established, initiating > session, client: , server: zookeeper.kafka.svc.cluster.local > (org.apache.zookeeper.ClientCnxn) > [2021-11-20 20:21:44,649] INFO Session establishment complete on server > zookeeper.kafka.svc.cluster.local, sessionid = 0x2000086f5260006, negotiated > timeout = 40000 (org.apache.zookeeper.ClientCnxn) > > [2021-11-20 20:21:57,133] INFO [ReplicaFetcher replicaId=1007, leaderId=1001, > fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread) > [2021-11-20 20:21:57,137] INFO [ReplicaFetcher replicaId=1007, leaderId=1001, > fetcherId=0] Error sending fetch request (sessionId=1896541533, epoch=50199) > to node 1001: (org.apache.kafka.clients.FetchSessionHandler) > java.io.IOException: Client was shutdown before response was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:217) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:325) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:141) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:140) > at scala.Option.foreach(Option.scala:407) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:140) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:123) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > [2021-11-20 20:21:57,141] INFO [ReplicaFetcher replicaId=1007, leaderId=1001, > fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread) > [2021-11-20 20:21:57,141] INFO [ReplicaFetcher replicaId=1007, leaderId=1001, > fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread) > [2021-11-20 20:21:57,145] INFO [ReplicaFetcher replicaId=1007, leaderId=1005, > fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread) > [2021-11-20 20:21:57,145] INFO [ReplicaFetcher replicaId=1007, leaderId=1005, > fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread) > [2021-11-20 20:21:57,146] INFO [ReplicaFetcher replicaId=1007, leaderId=1005, > fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread) > [2021-11-20 20:22:24,577] WARN [Partition data-23-changelog-3 broker=1007] > Failed to update ISR to PendingExpandIsr(isr=Set(1007), > newInSyncReplicaId=1003) due to unexpected UNKNOWN_SERVER_ERROR. Retrying. > (kafka.cluster.Partition) > [2021-11-20 20:22:24,578] ERROR [broker-1007-to-controller] Uncaught error in > request completion: (org.apache.kafka.clients.NetworkClient) > java.lang.IllegalStateException: Failed to enqueue ISR change state > LeaderAndIsr(leader=1007, leaderEpoch=2, isr=List(1007, 1003), zkVersion=2) > for partition data-23-changelog-3 > at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1379) > at > kafka.cluster.Partition.$anonfun$handleAlterIsrResponse$1(Partition.scala:1413) > at > kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1392) > at > kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1370) > at > kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1370) > at > kafka.server.DefaultAlterIsrManager.$anonfun$handleAlterIsrResponse$8(AlterIsrManager.scala:262) > at > kafka.server.DefaultAlterIsrManager.$anonfun$handleAlterIsrResponse$8$adapted(AlterIsrManager.scala:259) > at scala.collection.immutable.List.foreach(List.scala:431) > at > kafka.server.DefaultAlterIsrManager.handleAlterIsrResponse(AlterIsrManager.scala:259) > at > kafka.server.DefaultAlterIsrManager$$anon$1.onComplete(AlterIsrManager.scala:179) > at > kafka.server.BrokerToControllerRequestThread.handleResponse(BrokerToControllerChannelManager.scala:362) > at > kafka.server.BrokerToControllerRequestThread.$anonfun$generateRequests$1(BrokerToControllerChannelManager.scala:333) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576) > at > kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:74) > at > kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368) > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} > At this point we ran kafka-reassign-partitions.sh on that partition, with > the exact same config, and it fixed it. > {code:java} > kafka-reassign-partitions.sh --bootstrap-server > kafka.kafka.svc.cluster.local:9092 --reassignment-json-file > ./replicas-data-23-changlog.json > where replicas-data-23-changlog.json contains > {"version":1,"partitions":[{"topic":"data-23-changelog","partition":3,"replicas":[1007,1005,1003]}]} > {code} > > > Log showing ISR is back to normal: > {code:java} > [2021-11-21 12:36:36,727] INFO [Admin Manager on Broker 1007]: Updating > broker 1007 with new configuration : (kafka.server.ZkAdminManager) > [2021-11-21 12:36:36,747] INFO Processing notification(s) to /config/changes > (kafka.common.ZkNodeChangeNotificationListener) > [2021-11-21 12:36:36,749] INFO Processing override for entityPath: > brokers/1005 with config: Map() (kafka.server.DynamicConfigManager) > [2021-11-21 12:36:36,752] INFO Processing override for entityPath: > brokers/1007 with config: Map() (kafka.server.DynamicConfigManager) > ... > [2021-11-21 12:37:19,435] INFO [Partition data-23-changelog-3 broker=1007] > ISR updated to 1007,1005 and version updated to [4] (kafka.cluster.Partition) > [2021-11-21 12:37:19,928] INFO [Partition data-23-changelog-3 broker=1007] > ISR updated to 1007,1005,1003 and version updated to [5] > (kafka.cluster.Partition) {code} > > > Thank you kindly for any comments and suggestions! > Francois -- This message was sent by Atlassian Jira (v8.20.7#820007)