Re: Query regarding kafka controller shutdown
Thanks for your reply. Replication factor is 2 and min.insync.replicas is default (1) I will get back to you on the producer's ack settings. Regards, Dhiraj On Sun, Jun 5, 2022 at 1:20 PM Liam Clarke-Hutchinson wrote: > Off the top of my head, it looks like it lost network connectivity to some > extent. > > Question - what settings were used for topics like efGamePlay? What is min > insync replicas, replication factor, and what acks settings is the producer > using? > > Cheers, > > Liam > > On Fri, 3 Jun 2022 at 22:55, dhiraj prajapati > wrote: > > > Hi all, > > Recently we faced an issue with one of our production kafka clusters: > > - It is a 3 node cluster > > - kafka server version is 1.0 > > > > *Issue*: > > One of the brokers had some problem resulting in the following: > > 1. The broker lost leadership of all of the topic-parttions > > 2. However the kafka server process did *NOT* get stopped/killed. > > > > I have attached the exception traces from controller and server logs at > > that time at the end of this email. > > > > *Questions:* > > 1. What could be the reason behind this happening? > > 2. How can I reproduce this exact scenario in our test environment? > > > > P.S. We did not see any GC logs, or general network blips at around that > > time. > > > > *Exception trace in controller log:* > > > > [2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2] > Starting > > (kafka.controller.RequestSendThread) > > [2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2] > Starting > > (kafka.controller.RequestSendThread) > > [2022-05-21 09:07:48,354] ERROR [Controller id=2] Error while electing or > > becoming controller on broker 2 (kafka.controller.KafkaController) > > kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either > > before or while waiting for connection > > at > > > > > kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:233) > > at > > > > > kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221) > > at > > > > > kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221) > > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > > at > > > > > kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:221) > > at > > > > > kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:215) > > at > > > > > kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215) > > at > > > > > kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215) > > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > > at > > > > > kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:214) > > at > > > > > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1461) > > at > > > > > kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1437) > > at > > kafka.zk.KafkaZkClient.getPartitionReassignment(KafkaZkClient.scala:671) > > at > > > > > kafka.controller.KafkaController.initializePartitionReassignment(KafkaController.scala:698) > > at > > > > > kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:655) > > at > > > > > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:232) > > at > > > > > kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1203) > > at > > > > > kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1479) > > at > > > > > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69) > > at > > > > > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) > > at > > > > > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.sc
Query regarding kafka controller shutdown
Hi all, Recently we faced an issue with one of our production kafka clusters: - It is a 3 node cluster - kafka server version is 1.0 *Issue*: One of the brokers had some problem resulting in the following: 1. The broker lost leadership of all of the topic-parttions 2. However the kafka server process did *NOT* get stopped/killed. I have attached the exception traces from controller and server logs at that time at the end of this email. *Questions:* 1. What could be the reason behind this happening? 2. How can I reproduce this exact scenario in our test environment? P.S. We did not see any GC logs, or general network blips at around that time. *Exception trace in controller log:* [2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2] Starting (kafka.controller.RequestSendThread) [2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2] Starting (kafka.controller.RequestSendThread) [2022-05-21 09:07:48,354] ERROR [Controller id=2] Error while electing or becoming controller on broker 2 (kafka.controller.KafkaController) kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either before or while waiting for connection at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:233) at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221) at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:221) at kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:215) at kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215) at kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:214) at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1461) at kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1437) at kafka.zk.KafkaZkClient.getPartitionReassignment(KafkaZkClient.scala:671) at kafka.controller.KafkaController.initializePartitionReassignment(KafkaController.scala:698) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:655) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:232) at kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1203) at kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1479) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [2022-05-21 09:07:48,358] DEBUG [Controller id=2] Resigning (kafka.controller.KafkaController) [2022-05-21 09:07:48,358] DEBUG [Controller id=2] Unregister BrokerModifications handler for Set(2, 1) (kafka.controller.KafkaController) [2022-05-21 09:07:48,360] INFO [PartitionStateMachine controllerId=2] Stopped partition state machine (kafka.controller.PartitionStateMachine) [2022-05-21 09:07:48,361] INFO [ReplicaStateMachine controllerId=2] Stopped replica state machine (kafka.controller.ReplicaStateMachine) [2022-05-21 09:07:48,361] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread) [2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread) [2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread) [2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread) [2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread) [2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread) [2022-05-21 09:07:48,365] INFO [Controller id=2] Resigned (kafka.controller.KafkaController) *Snippets
Re: Kafka broker startup issue
Thanks for pointing this out. There was a broker instance of version 0.10.1.0 running. On May 23, 2017 11:34 AM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote: > Version 2 of UpdateMetadataRequest does not exist in version 0.9.0.1. This > suggests that you have a broker with a newer version of Kafka running > against the same ZK broker. Do you have any other versions running? Or is > it possible this is a shared ZK cluster and you're not using a namespace > within ZK for each cluster? > > -Ewen > > On Mon, May 22, 2017 at 12:33 AM, dhiraj prajapati <dhirajp...@gmail.com> > wrote: > > > Hi, > > I am getting the below exception while starting kafka broker 0.9.0.1: > > > > kafka.common.KafkaException: Version 2 is invalid for > > UpdateMetadataRequest. Valid versions are 0 or 1. > > at > > kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala: > 58) > > at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:54) > > at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:54) > > at kafka.network.RequestChannel$Request.(RequestChannel. > > scala:66) > > at kafka.network.Processor$$anonfun$run$11.apply( > > SocketServer.scala:426) > > at kafka.network.Processor$$anonfun$run$11.apply( > > SocketServer.scala:421) > > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > > at scala.collection.IterableLike$class.foreach(IterableLike. > scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at kafka.network.Processor.run(SocketServer.scala:421) > > at java.lang.Thread.run(Thread.java:745) > > > > What could be the issue? > > > > Regards, > > Dhiraj > > >
Kafka broker startup issue
Hi, I am getting the below exception while starting kafka broker 0.9.0.1: kafka.common.KafkaException: Version 2 is invalid for UpdateMetadataRequest. Valid versions are 0 or 1. at kafka.api.UpdateMetadataRequest$.readFrom(UpdateMetadataRequest.scala:58) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:54) at kafka.api.RequestKeys$$anonfun$7.apply(RequestKeys.scala:54) at kafka.network.RequestChannel$Request.(RequestChannel.scala:66) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.run(SocketServer.scala:421) at java.lang.Thread.run(Thread.java:745) What could be the issue? Regards, Dhiraj
Commitlog path while Upgrading kafka server from 0.9 to 0.10.2.0
Hi, I want to do a rolling upgrade of kafka server from 0.9 to 0.10.2.0. Should I keep path of the commit logs the same? what is the impact of keeping the path same/different? Thanks in advance.
Frequent UNKNOWN_MEMBER_ID errors in kafka consumer
Hi, I have a consumer which implements new consumer api (0.9.0.1). I see below errors quite frequently in the consumer application logs: ERROR [pool-4-thread-5] - o.a.k.c.c.i.ConsumerCoordinator - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group audit.consumer.group Can you please enlighten me about the reason of its occurrence?
Re: Regarding log.retention.bytes config
It is per partition On Aug 27, 2016 3:10 AM, "Amit Karyekar"wrote: > Hi, > > We’re using Kafka 0.9 > > Wanted to check whether log.retention.bytes works on per partition basis > or is it cumulative of all partitions? > > Regards, > Amit > Information contained in this e-mail message is confidential. This e-mail > message is intended only for the personal use of the recipient(s) named > above. If you are not an intended recipient, do not read, distribute or > reproduce this transmission (including any attachments). If you have > received this email in error, please immediately notify the sender by email > reply and delete the original message. >
Re: How to Identify Consumers of a Topic?
For kafka version > 0.9, you can use kafka-consumer-groups.sh On Tue, Aug 9, 2016 at 5:21 AM, Jillian Cocklin < jillian.cock...@danalinc.com> wrote: > Thanks Derar, > > I'll check that out and see if it gives enough information about the > consumer to track it. > > Thanks! > Jillian > > -Original Message- > From: Derar Alassi [mailto:derar.ala...@gmail.com] > Sent: Monday, August 08, 2016 3:35 PM > To: users@kafka.apache.org > Subject: Re: How to Identify Consumers of a Topic? > > I use kafka-consumer-offset-checker.sh to check offsets of consumers and > along that you get which consumer is attached to each partition. > > On Mon, Aug 8, 2016 at 3:12 PM, Jillian Cocklin < > jillian.cock...@danalinc.com> wrote: > > > Hello, > > > > Our team is using Kafka for the first time and are in the testing > > phase of getting a new product ready, which uses Kafka as the > > communications backbone. Basically, a processing unit will consume a > > message from a topic, do the processing, then produce the output to > another topic. > > Messages get passed back and forth between processors until done. > > > > We had an issue last week where an outdated processor was "stealing" > > messages from a topic, doing incorrect (outdated) processing, and > > putting it in the next topic. We could not find the rogue processor > > (aka consumer). We shut down all known consumers of that topic, and > > it was still happening. We finally gave up and renamed the topic to > > get around the issue. > > > > Is there a Kafka tool we could have used to find the connected > > consumer in that consumer group? Maybe by name or by IP? > > > > Thanks, > > Jillian > > > > >
UNKNOWN_MEMBER_ID
Hi, I am using kafka 0.9.0.1 and the corresponding java client for my consumer. I see the below error in my consumer logs: o.a.k.c.c.i.ConsumerCoordinator - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group consumergroup001 Why could this error occur?
What happens after connections.max.idle.ms | Kafka Producer
Hi, >From the document for producer configs: connections.max.idle.ms is the time after which idle connections will be closed. I wish to know what will happen if my connections are idle for long, and after that if the producer produces message? I dont see any exception. How does the producer client handle it? kafka version: 0.9.0.1 Regards, Dhiraj
Consume data in batches from Kafka
Hi, Can I coonsume data in batches from kafka using the old High Level Consumer? Is the new consumer API production ready?
Re: Some queries about java api for kafka producer
Thanks a lot On 13 Apr 2015 06:08, Manoj Khangaonkar khangaon...@gmail.com wrote: Clarification. My answer applies to the new producer API in 0.8.2. regards On Sun, Apr 12, 2015 at 4:00 PM, Manoj Khangaonkar khangaon...@gmail.com wrote: Hi, For (1) from the java docs The producer is *thread safe* and should generally be shared among all threads for best performance (2) (1) implies no pool is necessary. regards On Sun, Apr 12, 2015 at 12:38 AM, dhiraj prajapati dhirajp...@gmail.com wrote: Hi, I want to send data to apache kafka using the java api of the kafka producer. The data will be in high volume, of the order of more than 5 thousand messages per second. Please help me with the following queries: 1. Should I create only one producer object for the entire app and use the same object to send all the messages and then close the producer in the end? Or should I create a producer object for every message to be sent and close the producer connection after sending each message? 2. Does the producer api inherently use a pool of Producer objects? If yes, what is the default size of the pool and is it configurable? Thanks in advance, Dhiraj Prajapati -- http://khangaonkar.blogspot.com/ -- http://khangaonkar.blogspot.com/
Some queries about java api for kafka producer
Hi, I want to send data to apache kafka using the java api of the kafka producer. The data will be in high volume, of the order of more than 5 thousand messages per second. Please help me with the following queries: 1. Should I create only one producer object for the entire app and use the same object to send all the messages and then close the producer in the end? Or should I create a producer object for every message to be sent and close the producer connection after sending each message? 2. Does the producer api inherently use a pool of Producer objects? If yes, what is the default size of the pool and is it configurable? Thanks in advance, Dhiraj Prajapati