How to use correlation ID?
Hello, I would like to use Correlation ID in a microservices REST application but I am not sure how the value should be dealt with. Let say user opened page example.com:80/register?email=n...@example.com The request received a correlationID="registerRequest:n...@example.com" and was passed to microservice A, then to microservice B preserving correlationId. Service B created a kafka message {id:"n...@example.com"}. Should it had correlationId="registerRequest:n...@example.com" or rather a unique ID? I understand that correlationId is meant to correlate kafka request and kafka response. However it is not clear to me if this should be any way bound to a larger meaning of "request handling" and its correlationId. Thanks in advance for any responses :) Regards
Re: Error while fetching metadata with correlation id 3
Please check https://github.com/xerial/snappy-java for how to build / install snappyjava. On Thu, Dec 28, 2017 at 5:29 AM, Debraj Manna <subharaj.ma...@gmail.com> wrote: > Hi > > I am seeing an warning like below and my kafka java producer client is not > able to write to kafka broker. (Kafka version 0.10.0 both client & server) > > WARN Error while fetching metadata with correlation id 3 : > {abcdef=LEADER_NOT_AVAILABLE} > > >- OS - 14.04.1-Ubuntu >- Java - 8 > > > In kafka server.log I am seeing exception like below. I am using single > node kafka broker and zookeeper running on the same host. > > 2017-12-28 12:35:30,515] ERROR [Replica Manager on Broker 0]: Error > processing append operation on partition Topic3-DC0P6PI-0 > (kafka.server.ReplicaManager) > java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867) > at java.lang.Runtime.loadLibrary0(Runtime.java:870) > at java.lang.System.loadLibrary(System.java:1122) > at > org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:178) > at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:152) > at org.xerial.snappy.Snappy.(Snappy.java:47) > at > org.xerial.snappy.SnappyInputStream.hasNextChunk( > SnappyInputStream.java:435) > at > org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167) > at java.io.DataInputStream.readFully(DataInputStream.java:195) > at java.io.DataInputStream.readLong(DataInputStream.java:416) > at > kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream( > ByteBufferMessageSet.scala:118) > at > kafka.message.ByteBufferMessageSet$$anon$1.liftedTree2$1( > ByteBufferMessageSet.scala:107) > at > kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet. > scala:105) > at > kafka.message.ByteBufferMessageSet$.deepIterator( > ByteBufferMessageSet.scala:85) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter( > ByteBufferMessageSet.scala:356) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet. > scala:369) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet. > scala:324) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at > kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffse > ts(ByteBufferMessageSet.scala:427) > at kafka.log.Log.liftedTree1$1(Log.scala:339) > at kafka.log.Log.append(Log.scala:338) > at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:443) > at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:429) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237) > at > kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2. > apply(ReplicaManager.scala:406) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2. > apply(ReplicaManager.scala:392) > at > scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1. > apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1. > apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) >
Error while fetching metadata with correlation id 3
Hi I am seeing an warning like below and my kafka java producer client is not able to write to kafka broker. (Kafka version 0.10.0 both client & server) WARN Error while fetching metadata with correlation id 3 : {abcdef=LEADER_NOT_AVAILABLE} - OS - 14.04.1-Ubuntu - Java - 8 In kafka server.log I am seeing exception like below. I am using single node kafka broker and zookeeper running on the same host. 2017-12-28 12:35:30,515] ERROR [Replica Manager on Broker 0]: Error processing append operation on partition Topic3-DC0P6PI-0 (kafka.server.ReplicaManager) java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867) at java.lang.Runtime.loadLibrary0(Runtime.java:870) at java.lang.System.loadLibrary(System.java:1122) at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:178) at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:152) at org.xerial.snappy.Snappy.(Snappy.java:47) at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435) at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readLong(DataInputStream.java:416) at kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:118) at kafka.message.ByteBufferMessageSet$$anon$1.liftedTree2$1(ByteBufferMessageSet.scala:107) at kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:105) at kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85) at kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:356) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:369) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:324) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:427) at kafka.log.Log.liftedTree1$1(Log.scala:339) at kafka.log.Log.append(Log.scala:338) at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:443) at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:429) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:406) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:392) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105)
KAFKA (0.10.0.0) - Error while fetching metadata with correlation id
Hi, We are running KAFKA 0.10.0.0 but at times I see below error while starting STORM topologies. org.apache.kafka.clients.NetworkClient WARN - Error while fetching metadata with correlation id 325376 : {aceb3102-62d9-4809-b01b-6c69ebacbc90=UNKNOWN} Looks like KAFKA consumer is not able to fetch metadata for the topic and when I describe the topic details I see below configuration Topic: aceb3102-62d9-4809-b01b-6c69ebacbc90PartitionCount:1 ReplicationFactor:2 Configs: Topic: aceb3102-62d9-4809-b01b-6c69ebacbc90Partition: 0 Leader: -1 Replicas: 1,2 Isr: It's strange to see Leader: -1 and no in sync replica in ISR set. Not sure when KAFKA goes in that state or am I missing anything here? Thanks, Abhishek Barot -- This message w/attachments (message) is intended solely for the use of the intended recipient(s) and may contain information that is privileged, confidential or proprietary. If you are not an intended recipient, please notify the sender, and then please delete and destroy all copies and attachments, and be advised that any review or dissemination of, or the taking of any action in reliance on, the information contained in or attached to this message is prohibited. Unless specifically indicated, this message is not an offer to sell or a solicitation of any investment products or other financial product or service, an official confirmation of any transaction, or an official statement of Sender. Subject to applicable law, Sender may intercept, monitor, review and retain e-communications (EC) traveling through its networks/systems and may produce any such EC to regulators, law enforcement, in litigation and as required by law. The laws of the country of each sender/recipient may impact the handling of EC, and EC may be archived, supervised and produced in countries other than the country in which you are located. This message cannot be guaranteed to be secure or free of errors or viruses. Attachments that are part of this EC may have additional important disclosures and disclaimers, which you should read. By messaging with Sender you consent to the foregoing.
Re: java.lang.IllegalStateException: Correlation id for response () does not match request ()
Hi Mickael, This looks to be the same as KAFKA-4669. In theory, this should never happen and it's unclear when/how it can happen. Not sure if someone has investigated it in more detail. Ismael On Mon, Mar 6, 2017 at 5:15 PM, Mickael Maison <mickael.mai...@gmail.com> wrote: > Hi, > > In one of our clusters, some of our clients occasionally see this > exception: > java.lang.IllegalStateException: Correlation id for response (4564) > does not match request (4562) > at org.apache.kafka.clients.NetworkClient.correlate( > NetworkClient.java:486) > at org.apache.kafka.clients.NetworkClient.parseResponse( > NetworkClient.java:381) > at org.apache.kafka.clients.NetworkClient.handleCompletedReceives( > NetworkClient.java:449) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) > at java.lang.Thread.run(Unknown Source) > > We've also seen it from consumer poll() and commit() > > Usually the response's correlation id is off by just 1 or 2 (like > above) but we've also seen it off by a few hundreds: > java.lang.IllegalStateException: Correlation id for response (742) > does not match request (174) > at org.apache.kafka.clients.NetworkClient.correlate( > NetworkClient.java:486) > at org.apache.kafka.clients.NetworkClient.parseResponse( > NetworkClient.java:381) > at org.apache.kafka.clients.NetworkClient.handleCompletedReceives( > NetworkClient.java:449) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > clientPoll(ConsumerNetworkClient.java:360) > at org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > commitOffsetsSync(ConsumerCoordinator.java:426) > at org.apache.kafka.clients.consumer.KafkaConsumer. > commitSync(KafkaConsumer.java:1059) > at org.apache.kafka.clients.consumer.KafkaConsumer. > commitSync(KafkaConsumer.java:1027) > > When this happens, all subsequent responses are also shifted: > java.lang.IllegalStateException: Correlation id for response (743) > does not match request (742) > java.lang.IllegalStateException: Correlation id for response (744) > does not match request (743) > java.lang.IllegalStateException: Correlation id for response (745) > does not match request (744) > java.lang.IllegalStateException: Correlation id for response (746) > does not match request (745) > ... > It's easy to discard and recreate the consumer instance to recover > however we can't do that with the producer as it occurs in the Sender > thread. > > Our cluster and our clients are running Kafka 0.10.0.1. > Under which circumstances would such an error happen ? > Even with logging set to TRACE, we can't spot anything suspicious > shortly before the issue. Is there any data we should try to capture > when this happens ? > > Thanks! >
java.lang.IllegalStateException: Correlation id for response () does not match request ()
Hi, In one of our clusters, some of our clients occasionally see this exception: java.lang.IllegalStateException: Correlation id for response (4564) does not match request (4562) at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:486) at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) at java.lang.Thread.run(Unknown Source) We've also seen it from consumer poll() and commit() Usually the response's correlation id is off by just 1 or 2 (like above) but we've also seen it off by a few hundreds: java.lang.IllegalStateException: Correlation id for response (742) does not match request (174) at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:486) at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1027) When this happens, all subsequent responses are also shifted: java.lang.IllegalStateException: Correlation id for response (743) does not match request (742) java.lang.IllegalStateException: Correlation id for response (744) does not match request (743) java.lang.IllegalStateException: Correlation id for response (745) does not match request (744) java.lang.IllegalStateException: Correlation id for response (746) does not match request (745) ... It's easy to discard and recreate the consumer instance to recover however we can't do that with the producer as it occurs in the Sender thread. Our cluster and our clients are running Kafka 0.10.0.1. Under which circumstances would such an error happen ? Even with logging set to TRACE, we can't spot anything suspicious shortly before the issue. Is there any data we should try to capture when this happens ? Thanks!
Re: Correlation Id errors for both console producer and consumer
Anybody ever seen this before? Anybody have any ideas as to where I can start troubleshooting? From: Zac Harvey <zac.har...@welltok.com> Sent: Tuesday, January 17, 2017 4:40:35 PM To: users@kafka.apache.org Subject: Re: Correlation Id errors for both console producer and consumer Hi Jeff, Versions: Kafka: kafka_2.11-0.10.0.0 ZK: zookeeper-3.4.6 Let me know if you need any more details/info. Thanks! -Zac From: Jeff Widman <j...@netskope.com> Sent: Tuesday, January 17, 2017 4:38:07 PM To: users@kafka.apache.org Subject: Re: Correlation Id errors for both console producer and consumer What versions of Kafka and Zookeeper are you using? On Tue, Jan 17, 2017 at 11:57 AM, Zac Harvey <zac.har...@welltok.com> wrote: > I have 2 Kafkas backed by 3 ZK nodes. I want to test the Kafka nodes by > running the kafka-console-producer and -consumer locally on each node. > > So I SSH into one of my Kafka brokers using 2 different terminals. In > terminal #1 I run the consumer like so: > > /opt/kafka/bin/kafka-console-consumer.sh --zookeeper a.b.c.d:2181 > --topic test1 > > Where a.b.c.d is the private IP of one of my 3 ZK nodes. > > Then in terminal #2 I run the producer like so: > > /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 > --topic test1 > > I am able to start both the consumer and producer just fine without any > issues. > > However, in the producer terminal, if I "fire" a message at the test1 > topic by entering some text (such as "hello") and hitting the ENTER key, I > immediately begin seeing this: > > [2017-01-17 19:45:57,353] WARN Error while fetching metadata with > correlation id 0 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-01-17 19:45:57,372] WARN Error while fetching metadata with > correlation id 1 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-01-17 19:45:57,477] WARN Error while fetching metadata with > correlation id 2 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-01-17 19:45:57,582] WARN Error while fetching metadata with > correlation id 3 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > ...and it keeps going! > > And, in the consumer terminal, even though I don't get any errors when I > start the consumer, after about 30 seconds I get the following warning > message: > > [2017-01-17 19:46:07,292] WARN Fetching topic metadata with > correlation id 1 for topics [Set(test1)] from broker > [BrokerEndPoint(1,ip-x-y-z-w.ec2.internal,9092)] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) > at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$ > doSend(SyncProducer.scala:79) > at kafka.producer.SyncProducer.send(SyncProducer.scala:124) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) > at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork( > ConsumerFetcherManager.scala:66) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > > Interestingly, ip-x-y-z-w.ec2.internal is the private DNS for the other > Kafka broker, so perhaps this is some kind of failure during interbroker > communication? > > Any ideas as to what is going on here and what I can do to troubleshoot? > > >
Re: Correlation Id errors for both console producer and consumer
Hi Jeff, Versions: Kafka: kafka_2.11-0.10.0.0 ZK: zookeeper-3.4.6 Let me know if you need any more details/info. Thanks! -Zac From: Jeff Widman <j...@netskope.com> Sent: Tuesday, January 17, 2017 4:38:07 PM To: users@kafka.apache.org Subject: Re: Correlation Id errors for both console producer and consumer What versions of Kafka and Zookeeper are you using? On Tue, Jan 17, 2017 at 11:57 AM, Zac Harvey <zac.har...@welltok.com> wrote: > I have 2 Kafkas backed by 3 ZK nodes. I want to test the Kafka nodes by > running the kafka-console-producer and -consumer locally on each node. > > So I SSH into one of my Kafka brokers using 2 different terminals. In > terminal #1 I run the consumer like so: > > /opt/kafka/bin/kafka-console-consumer.sh --zookeeper a.b.c.d:2181 > --topic test1 > > Where a.b.c.d is the private IP of one of my 3 ZK nodes. > > Then in terminal #2 I run the producer like so: > > /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 > --topic test1 > > I am able to start both the consumer and producer just fine without any > issues. > > However, in the producer terminal, if I "fire" a message at the test1 > topic by entering some text (such as "hello") and hitting the ENTER key, I > immediately begin seeing this: > > [2017-01-17 19:45:57,353] WARN Error while fetching metadata with > correlation id 0 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-01-17 19:45:57,372] WARN Error while fetching metadata with > correlation id 1 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-01-17 19:45:57,477] WARN Error while fetching metadata with > correlation id 2 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-01-17 19:45:57,582] WARN Error while fetching metadata with > correlation id 3 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > ...and it keeps going! > > And, in the consumer terminal, even though I don't get any errors when I > start the consumer, after about 30 seconds I get the following warning > message: > > [2017-01-17 19:46:07,292] WARN Fetching topic metadata with > correlation id 1 for topics [Set(test1)] from broker > [BrokerEndPoint(1,ip-x-y-z-w.ec2.internal,9092)] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) > at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$ > doSend(SyncProducer.scala:79) > at kafka.producer.SyncProducer.send(SyncProducer.scala:124) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) > at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork( > ConsumerFetcherManager.scala:66) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > > Interestingly, ip-x-y-z-w.ec2.internal is the private DNS for the other > Kafka broker, so perhaps this is some kind of failure during interbroker > communication? > > Any ideas as to what is going on here and what I can do to troubleshoot? > > >
Re: Correlation Id errors for both console producer and consumer
What versions of Kafka and Zookeeper are you using? On Tue, Jan 17, 2017 at 11:57 AM, Zac Harvey <zac.har...@welltok.com> wrote: > I have 2 Kafkas backed by 3 ZK nodes. I want to test the Kafka nodes by > running the kafka-console-producer and -consumer locally on each node. > > So I SSH into one of my Kafka brokers using 2 different terminals. In > terminal #1 I run the consumer like so: > > /opt/kafka/bin/kafka-console-consumer.sh --zookeeper a.b.c.d:2181 > --topic test1 > > Where a.b.c.d is the private IP of one of my 3 ZK nodes. > > Then in terminal #2 I run the producer like so: > > /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 > --topic test1 > > I am able to start both the consumer and producer just fine without any > issues. > > However, in the producer terminal, if I "fire" a message at the test1 > topic by entering some text (such as "hello") and hitting the ENTER key, I > immediately begin seeing this: > > [2017-01-17 19:45:57,353] WARN Error while fetching metadata with > correlation id 0 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-01-17 19:45:57,372] WARN Error while fetching metadata with > correlation id 1 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-01-17 19:45:57,477] WARN Error while fetching metadata with > correlation id 2 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-01-17 19:45:57,582] WARN Error while fetching metadata with > correlation id 3 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > ...and it keeps going! > > And, in the consumer terminal, even though I don't get any errors when I > start the consumer, after about 30 seconds I get the following warning > message: > > [2017-01-17 19:46:07,292] WARN Fetching topic metadata with > correlation id 1 for topics [Set(test1)] from broker > [BrokerEndPoint(1,ip-x-y-z-w.ec2.internal,9092)] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) > at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$ > doSend(SyncProducer.scala:79) > at kafka.producer.SyncProducer.send(SyncProducer.scala:124) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) > at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork( > ConsumerFetcherManager.scala:66) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > > Interestingly, ip-x-y-z-w.ec2.internal is the private DNS for the other > Kafka broker, so perhaps this is some kind of failure during interbroker > communication? > > Any ideas as to what is going on here and what I can do to troubleshoot? > > >
Correlation Id errors for both console producer and consumer
I have 2 Kafkas backed by 3 ZK nodes. I want to test the Kafka nodes by running the kafka-console-producer and -consumer locally on each node. So I SSH into one of my Kafka brokers using 2 different terminals. In terminal #1 I run the consumer like so: /opt/kafka/bin/kafka-console-consumer.sh --zookeeper a.b.c.d:2181 --topic test1 Where a.b.c.d is the private IP of one of my 3 ZK nodes. Then in terminal #2 I run the producer like so: /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1 I am able to start both the consumer and producer just fine without any issues. However, in the producer terminal, if I "fire" a message at the test1 topic by entering some text (such as "hello") and hitting the ENTER key, I immediately begin seeing this: [2017-01-17 19:45:57,353] WARN Error while fetching metadata with correlation id 0 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2017-01-17 19:45:57,372] WARN Error while fetching metadata with correlation id 1 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2017-01-17 19:45:57,477] WARN Error while fetching metadata with correlation id 2 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2017-01-17 19:45:57,582] WARN Error while fetching metadata with correlation id 3 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) ...and it keeps going! And, in the consumer terminal, even though I don't get any errors when I start the consumer, after about 30 seconds I get the following warning message: [2017-01-17 19:46:07,292] WARN Fetching topic metadata with correlation id 1 for topics [Set(test1)] from broker [BrokerEndPoint(1,ip-x-y-z-w.ec2.internal,9092)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Interestingly, ip-x-y-z-w.ec2.internal is the private DNS for the other Kafka broker, so perhaps this is some kind of failure during interbroker communication? Any ideas as to what is going on here and what I can do to troubleshoot?
Re: WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)
Sorry my bad..I should not add "?" at the end On 6 April 2016 at 11:01, Ratha v <vijayara...@gmail.com> wrote: > Hi all; > when I run the following command with kafka 0.9.0.1 i get this > warnings[1]. Can you please tell me what is wrong with my topics? (I'm > talking to the kafka broker which runs in ec2) > > *#./kafka-console-consumer.sh --new-consumer --bootstrap-server > kafka.xx.com:9092 <http://kafka.xx.com:9092> --topic MY_TOPIC?* > > [1] > -- [2016-04-06 10:57:45,839] WARN Error while fetching metadata with > correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} > (org.apache.kafka.clients.NetworkClient) > > [2016-04-06 10:57:46,066] WARN Error while fetching metadata with > correlation id 3 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} > (org.apache.kafka.clients.NetworkClient) > > [2016-04-06 10:57:46,188] WARN Error while fetching metadata with > correlation id 5 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} > (org.apache.kafka.clients.NetworkClient) > > [2016-04-06 10:57:46,311] WARN Error while fetching metadata with > correlation id 7 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} > (org.apache.kafka.clients.NetworkClient) > -Ratha > http://vvratha.blogspot.com/ > -- -Ratha http://vvratha.blogspot.com/
WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)
Hi all; when I run the following command with kafka 0.9.0.1 i get this warnings[1]. Can you please tell me what is wrong with my topics? (I'm talking to the kafka broker which runs in ec2) *#./kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 <http://kafka.xx.com:9092> --topic MY_TOPIC?* [1] -- [2016-04-06 10:57:45,839] WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient) [2016-04-06 10:57:46,066] WARN Error while fetching metadata with correlation id 3 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient) [2016-04-06 10:57:46,188] WARN Error while fetching metadata with correlation id 5 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient) [2016-04-06 10:57:46,311] WARN Error while fetching metadata with correlation id 7 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient) -Ratha http://vvratha.blogspot.com/
Re: Correlation id
Thanks Tejas! That's very helpful. Tim On Mon, Aug 19, 2013 at 11:40 PM, Tejas Patil tejas.patil...@gmail.comwrote: Multiple produce requests are sent asynchronously over the same socket. Suppose you send 2 requests and get back single response, how do you figure out which one it corresponds to of those 2 requests ? Correlation Id helps here. AFAIK, correlation Id is added to produce requests and broker uses the same id in its response so that the producer can keep track of its requests. Correlation Id also helps in debugging issues as now you can uniquely identify requests across producer and broker logs. On Mon, Aug 19, 2013 at 11:01 PM, Timothy Chen tnac...@gmail.com wrote: Hi, This is probably a very obvious questoin, but I cannot find the answer for this. What does the correlation id mean in a producer request? Tim
Re: Fetch request with correlation id 1171437 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition
Jun. Yes. I am using 0.8.0-beta1. On Fri, Jun 28, 2013 at 9:26 PM, Jun Rao jun...@gmail.com wrote: In this case. the warning is for the fetch request from the replica, not regular consumers. I assume this the log for the restarted broker. Is this transient? Are you using 0.8.0-beta1? Thanks, Jun On Fri, Jun 28, 2013 at 10:57 AM, Vadim Keylis vkeylis2...@gmail.com wrote: Good morning. I have a cluster of 3 kafka nodes. They were both running at the time. I need it to make configuration change in the property file and restart kafka. I have not broker shutdown tool, but simple used pkill -TERM -u ${KAFKA_USER} -f kafka.Kafka. That suddenly cause the exception. How to avoid this issue in the future? What's the right way to shutdown kafka to prevent Not Leder Exception Thanks so much in advance, Vadim [2013-06-28 10:46:53,281] WARN [KafkaApi-1] Fetch request with correlation id 1171435 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,282] WARN [KafkaApi-1] Fetch request with correlation id 1171436 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,448] WARN [ReplicaFetcherThread-0-2], error for partition [meetme,0] to broker 2 (kafka.server.ReplicaFetcherThread) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:156) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2013-06-28 10:46:53,476] INFO Closing socket connection to / 10.98.21.112. (kafka.network.Processor) [2013-06-28 10:46:53,686] INFO Closing socket connection to / 10.98.21.112. (kafka.network.Processor)
Re: Fetch request with correlation id 1171437 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition
Is that warning transient? If not, take a look at the state-change.log. This should include all the leaderAndIsr requests. See the leader received by this broker matches what's determined by the controller (in the controller.log in the controller broker). Thanks, Jun On Fri, Jun 28, 2013 at 11:26 PM, Vadim Keylis vkeylis2...@gmail.comwrote: Jun. Yes. I am using 0.8.0-beta1. On Fri, Jun 28, 2013 at 9:26 PM, Jun Rao jun...@gmail.com wrote: In this case. the warning is for the fetch request from the replica, not regular consumers. I assume this the log for the restarted broker. Is this transient? Are you using 0.8.0-beta1? Thanks, Jun On Fri, Jun 28, 2013 at 10:57 AM, Vadim Keylis vkeylis2...@gmail.com wrote: Good morning. I have a cluster of 3 kafka nodes. They were both running at the time. I need it to make configuration change in the property file and restart kafka. I have not broker shutdown tool, but simple used pkill -TERM -u ${KAFKA_USER} -f kafka.Kafka. That suddenly cause the exception. How to avoid this issue in the future? What's the right way to shutdown kafka to prevent Not Leder Exception Thanks so much in advance, Vadim [2013-06-28 10:46:53,281] WARN [KafkaApi-1] Fetch request with correlation id 1171435 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,282] WARN [KafkaApi-1] Fetch request with correlation id 1171436 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,448] WARN [ReplicaFetcherThread-0-2], error for partition [meetme,0] to broker 2 (kafka.server.ReplicaFetcherThread) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:156) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2013-06-28 10:46:53,476] INFO Closing socket connection to / 10.98.21.112. (kafka.network.Processor) [2013-06-28 10:46:53,686] INFO Closing socket connection to / 10.98.21.112. (kafka.network.Processor)
Fetch request with correlation id 1171437 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition
Good morning. I have a cluster of 3 kafka nodes. They were both running at the time. I need it to make configuration change in the property file and restart kafka. I have not broker shutdown tool, but simple used pkill -TERM -u ${KAFKA_USER} -f kafka.Kafka. That suddenly cause the exception. How to avoid this issue in the future? What's the right way to shutdown kafka to prevent Not Leder Exception Thanks so much in advance, Vadim [2013-06-28 10:46:53,281] WARN [KafkaApi-1] Fetch request with correlation id 1171435 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,282] WARN [KafkaApi-1] Fetch request with correlation id 1171436 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,448] WARN [ReplicaFetcherThread-0-2], error for partition [meetme,0] to broker 2 (kafka.server.ReplicaFetcherThread) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:156) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2013-06-28 10:46:53,476] INFO Closing socket connection to /10.98.21.112. (kafka.network.Processor) [2013-06-28 10:46:53,686] INFO Closing socket connection to /10.98.21.112. (kafka.network.Processor)
Re: Fetch request with correlation id 1171437 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition
I want to clarify that I restarted only one kafka node, all others were running and did not require restart On Fri, Jun 28, 2013 at 10:57 AM, Vadim Keylis vkeylis2...@gmail.comwrote: Good morning. I have a cluster of 3 kafka nodes. They were both running at the time. I need it to make configuration change in the property file and restart kafka. I have not broker shutdown tool, but simple used pkill -TERM -u ${KAFKA_USER} -f kafka.Kafka. That suddenly cause the exception. How to avoid this issue in the future? What's the right way to shutdown kafka to prevent Not Leder Exception Thanks so much in advance, Vadim [2013-06-28 10:46:53,281] WARN [KafkaApi-1] Fetch request with correlation id 1171435 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,282] WARN [KafkaApi-1] Fetch request with correlation id 1171436 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,448] WARN [ReplicaFetcherThread-0-2], error for partition [meetme,0] to broker 2 (kafka.server.ReplicaFetcherThread) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:156) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2013-06-28 10:46:53,476] INFO Closing socket connection to /10.98.21.112. (kafka.network.Processor) [2013-06-28 10:46:53,686] INFO Closing socket connection to /10.98.21.112. (kafka.network.Processor)
Re: Fetch request with correlation id 1171437 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition
Getting kafka.common.NotLeaderForPartitionException for a time after a node is brought back on line (especially if it is a short downtime) is normal - that is because the consumers have not yet completely picked up the new leader information. If should settle shortly. -- Dave DeMaagd ddema...@linkedin.com | 818 262 7958 (vkeylis2...@gmail.com - Fri, Jun 28, 2013 at 11:08:46AM -0700) I want to clarify that I restarted only one kafka node, all others were running and did not require restart On Fri, Jun 28, 2013 at 10:57 AM, Vadim Keylis vkeylis2...@gmail.comwrote: Good morning. I have a cluster of 3 kafka nodes. They were both running at the time. I need it to make configuration change in the property file and restart kafka. I have not broker shutdown tool, but simple used pkill -TERM -u ${KAFKA_USER} -f kafka.Kafka. That suddenly cause the exception. How to avoid this issue in the future? What's the right way to shutdown kafka to prevent Not Leder Exception Thanks so much in advance, Vadim [2013-06-28 10:46:53,281] WARN [KafkaApi-1] Fetch request with correlation id 1171435 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,282] WARN [KafkaApi-1] Fetch request with correlation id 1171436 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,448] WARN [ReplicaFetcherThread-0-2], error for partition [meetme,0] to broker 2 (kafka.server.ReplicaFetcherThread) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:156) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2013-06-28 10:46:53,476] INFO Closing socket connection to /10.98.21.112. (kafka.network.Processor) [2013-06-28 10:46:53,686] INFO Closing socket connection to /10.98.21.112. (kafka.network.Processor)
Re: Fetch request with correlation id 1171437 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition
David. What is the expected time frame for the exception to continue? Its an hour has passed since short downtime and I still see the exception in kafka service logs. Thanks, Vadim On Fri, Jun 28, 2013 at 11:25 AM, David DeMaagd ddema...@linkedin.comwrote: Getting kafka.common.NotLeaderForPartitionException for a time after a node is brought back on line (especially if it is a short downtime) is normal - that is because the consumers have not yet completely picked up the new leader information. If should settle shortly. -- Dave DeMaagd ddema...@linkedin.com | 818 262 7958 (vkeylis2...@gmail.com - Fri, Jun 28, 2013 at 11:08:46AM -0700) I want to clarify that I restarted only one kafka node, all others were running and did not require restart On Fri, Jun 28, 2013 at 10:57 AM, Vadim Keylis vkeylis2...@gmail.com wrote: Good morning. I have a cluster of 3 kafka nodes. They were both running at the time. I need it to make configuration change in the property file and restart kafka. I have not broker shutdown tool, but simple used pkill -TERM -u ${KAFKA_USER} -f kafka.Kafka. That suddenly cause the exception. How to avoid this issue in the future? What's the right way to shutdown kafka to prevent Not Leder Exception Thanks so much in advance, Vadim [2013-06-28 10:46:53,281] WARN [KafkaApi-1] Fetch request with correlation id 1171435 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,282] WARN [KafkaApi-1] Fetch request with correlation id 1171436 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,448] WARN [ReplicaFetcherThread-0-2], error for partition [meetme,0] to broker 2 (kafka.server.ReplicaFetcherThread) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:156) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2013-06-28 10:46:53,476] INFO Closing socket connection to / 10.98.21.112. (kafka.network.Processor) [2013-06-28 10:46:53,686] INFO Closing socket connection to / 10.98.21.112. (kafka.network.Processor)
Re: Fetch request with correlation id 1171437 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition
Unless I'm misreading something, that is controlled by the topic.metadata.refresh.interval.ms variable (defaults to 10 minutes), and I've not seen it run longer than that (unless there was other problems besides that going on). I would check the JMX values for things under kafka.server:type=ReplicaManager, particularly UnderReplicatedPartitions and possibly the ISR Expand/Shrinks values - those could indicate a problem on the brokers that is preventing things from settling down completely. Might also look and see if you are doing any heavy GCs (which can cause zookeeper connection issues, which would then complicate the ISR election stuff). -- Dave DeMaagd ddema...@linkedin.com | 818 262 7958 (vkeylis2...@gmail.com - Fri, Jun 28, 2013 at 11:32:42AM -0700) David. What is the expected time frame for the exception to continue? Its an hour has passed since short downtime and I still see the exception in kafka service logs. Thanks, Vadim On Fri, Jun 28, 2013 at 11:25 AM, David DeMaagd ddema...@linkedin.comwrote: Getting kafka.common.NotLeaderForPartitionException for a time after a node is brought back on line (especially if it is a short downtime) is normal - that is because the consumers have not yet completely picked up the new leader information. If should settle shortly. -- Dave DeMaagd ddema...@linkedin.com | 818 262 7958 (vkeylis2...@gmail.com - Fri, Jun 28, 2013 at 11:08:46AM -0700) I want to clarify that I restarted only one kafka node, all others were running and did not require restart On Fri, Jun 28, 2013 at 10:57 AM, Vadim Keylis vkeylis2...@gmail.com wrote: Good morning. I have a cluster of 3 kafka nodes. They were both running at the time. I need it to make configuration change in the property file and restart kafka. I have not broker shutdown tool, but simple used pkill -TERM -u ${KAFKA_USER} -f kafka.Kafka. That suddenly cause the exception. How to avoid this issue in the future? What's the right way to shutdown kafka to prevent Not Leder Exception Thanks so much in advance, Vadim [2013-06-28 10:46:53,281] WARN [KafkaApi-1] Fetch request with correlation id 1171435 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,282] WARN [KafkaApi-1] Fetch request with correlation id 1171436 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,448] WARN [ReplicaFetcherThread-0-2], error for partition [meetme,0] to broker 2 (kafka.server.ReplicaFetcherThread) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:156) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2013-06-28 10:46:53,476] INFO Closing socket connection to / 10.98.21.112. (kafka.network.Processor) [2013-06-28 10:46:53,686] INFO Closing socket connection to / 10.98.21.112. (kafka.network.Processor)
Re: Fetch request with correlation id 1171437 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition
Joel. My problem after your explanation is that leader for some reason did not get elected and exception is been thrown for hours now. What is the best way to force leader creation for that partition? Vadim On Fri, Jun 28, 2013 at 12:26 PM, Joel Koshy jjkosh...@gmail.com wrote: Just wanted to clarify: the topic.metadata.refresh.interval.ms would apply to producers - and mainly with ack = 0. (If ack = 1, then a metadata request would be issued on this exception although even with ack 0 it is useful to have the metadata refresh for refreshing information about how many partitions are available.) For replica fetchers (Vadim's case) the exceptions would persist for as long as the new leader for the replica in question is elected. It should not take too long. When the leader is elected, the controller will send out an RPC to the new leaders and followers and the above exceptions will go away. Also, to answer your question: the right way to shutdown an 0.8 cluster is to use controlled shutdown. That will not eliminate the exceptions, but they are more for informative purposes and are non-fatal (i.e., the logging can probably be improved a bit). On Fri, Jun 28, 2013 at 11:47 AM, David DeMaagd ddema...@linkedin.com wrote: Unless I'm misreading something, that is controlled by the topic.metadata.refresh.interval.ms variable (defaults to 10 minutes), and I've not seen it run longer than that (unless there was other problems besides that going on). I would check the JMX values for things under kafka.server:type=ReplicaManager, particularly UnderReplicatedPartitions and possibly the ISR Expand/Shrinks values - those could indicate a problem on the brokers that is preventing things from settling down completely. Might also look and see if you are doing any heavy GCs (which can cause zookeeper connection issues, which would then complicate the ISR election stuff). -- Dave DeMaagd ddema...@linkedin.com | 818 262 7958 (vkeylis2...@gmail.com - Fri, Jun 28, 2013 at 11:32:42AM -0700) David. What is the expected time frame for the exception to continue? Its an hour has passed since short downtime and I still see the exception in kafka service logs. Thanks, Vadim On Fri, Jun 28, 2013 at 11:25 AM, David DeMaagd ddema...@linkedin.com wrote: Getting kafka.common.NotLeaderForPartitionException for a time after a node is brought back on line (especially if it is a short downtime) is normal - that is because the consumers have not yet completely picked up the new leader information. If should settle shortly. -- Dave DeMaagd ddema...@linkedin.com | 818 262 7958 (vkeylis2...@gmail.com - Fri, Jun 28, 2013 at 11:08:46AM -0700) I want to clarify that I restarted only one kafka node, all others were running and did not require restart On Fri, Jun 28, 2013 at 10:57 AM, Vadim Keylis vkeylis2...@gmail.com wrote: Good morning. I have a cluster of 3 kafka nodes. They were both running at the time. I need it to make configuration change in the property file and restart kafka. I have not broker shutdown tool, but simple used pkill -TERM -u ${KAFKA_USER} -f kafka.Kafka. That suddenly cause the exception. How to avoid this issue in the future? What's the right way to shutdown kafka to prevent Not Leder Exception Thanks so much in advance, Vadim [2013-06-28 10:46:53,281] WARN [KafkaApi-1] Fetch request with correlation id 1171435 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,282] WARN [KafkaApi-1] Fetch request with correlation id 1171436 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,448] WARN [ReplicaFetcherThread-0-2], error for partition [meetme,0] to broker 2 (kafka.server.ReplicaFetcherThread) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157
Re: Fetch request with correlation id 1171437 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition
Leader election occurs when brokers are bounced or lose their zookeeper registration. Do you have a state-change.log on your brokers? Also can you see what's in the following zk paths: get /brokers/topics/meetme get /brokers/topics/meetme/partitions/0/state On Fri, Jun 28, 2013 at 1:40 PM, Vadim Keylis vkeylis2...@gmail.com wrote: Joel. My problem after your explanation is that leader for some reason did not get elected and exception is been thrown for hours now. What is the best way to force leader creation for that partition? Vadim On Fri, Jun 28, 2013 at 12:26 PM, Joel Koshy jjkosh...@gmail.com wrote: Just wanted to clarify: the topic.metadata.refresh.interval.ms would apply to producers - and mainly with ack = 0. (If ack = 1, then a metadata request would be issued on this exception although even with ack 0 it is useful to have the metadata refresh for refreshing information about how many partitions are available.) For replica fetchers (Vadim's case) the exceptions would persist for as long as the new leader for the replica in question is elected. It should not take too long. When the leader is elected, the controller will send out an RPC to the new leaders and followers and the above exceptions will go away. Also, to answer your question: the right way to shutdown an 0.8 cluster is to use controlled shutdown. That will not eliminate the exceptions, but they are more for informative purposes and are non-fatal (i.e., the logging can probably be improved a bit). On Fri, Jun 28, 2013 at 11:47 AM, David DeMaagd ddema...@linkedin.com wrote: Unless I'm misreading something, that is controlled by the topic.metadata.refresh.interval.ms variable (defaults to 10 minutes), and I've not seen it run longer than that (unless there was other problems besides that going on). I would check the JMX values for things under kafka.server:type=ReplicaManager, particularly UnderReplicatedPartitions and possibly the ISR Expand/Shrinks values - those could indicate a problem on the brokers that is preventing things from settling down completely. Might also look and see if you are doing any heavy GCs (which can cause zookeeper connection issues, which would then complicate the ISR election stuff). -- Dave DeMaagd ddema...@linkedin.com | 818 262 7958 (vkeylis2...@gmail.com - Fri, Jun 28, 2013 at 11:32:42AM -0700) David. What is the expected time frame for the exception to continue? Its an hour has passed since short downtime and I still see the exception in kafka service logs. Thanks, Vadim On Fri, Jun 28, 2013 at 11:25 AM, David DeMaagd ddema...@linkedin.com wrote: Getting kafka.common.NotLeaderForPartitionException for a time after a node is brought back on line (especially if it is a short downtime) is normal - that is because the consumers have not yet completely picked up the new leader information. If should settle shortly. -- Dave DeMaagd ddema...@linkedin.com | 818 262 7958 (vkeylis2...@gmail.com - Fri, Jun 28, 2013 at 11:08:46AM -0700) I want to clarify that I restarted only one kafka node, all others were running and did not require restart On Fri, Jun 28, 2013 at 10:57 AM, Vadim Keylis vkeylis2...@gmail.com wrote: Good morning. I have a cluster of 3 kafka nodes. They were both running at the time. I need it to make configuration change in the property file and restart kafka. I have not broker shutdown tool, but simple used pkill -TERM -u ${KAFKA_USER} -f kafka.Kafka. That suddenly cause the exception. How to avoid this issue in the future? What's the right way to shutdown kafka to prevent Not Leder Exception Thanks so much in advance, Vadim [2013-06-28 10:46:53,281] WARN [KafkaApi-1] Fetch request with correlation id 1171435 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,282] WARN [KafkaApi-1] Fetch request with correlation id 1171436 from client ReplicaFetcherThread-0-1 on partition [meetme,0] failed due to Leader not local for partition [meetme,0] on broker 1 (kafka.server.KafkaApis) [2013-06-28 10:46:53,448] WARN [ReplicaFetcherThread-0-2], error for partition [meetme,0] to broker 2 (kafka.server.ReplicaFetcherThread) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0