[
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897746#comment-15897746
]
Edoardo Comar commented on KAFKA-4669:
--------------------------------------
We've seen this same exception using 10.0.1 clients against 10.0.1 brokers -
without doing a {{KafkaProducer.flush()}}
We've seen it happening during
{{KafkaConsumer.poll()}}
{{KafkaConsumer.commitSync()}}
and
in the network I/O of a KafkaProducer that is just doing sends.eg.
{code}
java.lang.IllegalStateException: Correlation id for response (4564)
does not match request (4562)
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:486)
at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:381)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
at java.lang.Thread.run(Unknown Source)
{code}
{code}
java.lang.IllegalStateException: Correlation id for response (742) does not
match request (174)
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:486)
at
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:381)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1027)
{code}
Usually the response's correlation id is off by just 1 or 2 but we've also seen
it off by a few hundreds.
When this happens, all subsequent responses are also shifted:
{code}
java.lang.IllegalStateException: Correlation id for response (743)
does not match request (742)
java.lang.IllegalStateException: Correlation id for response (744)
does not match request (743)
java.lang.IllegalStateException: Correlation id for response (745)
does not match request (744)
{code}
> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws
> exception
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.9.0.1
> Reporter: Cheng Ju
> Priority: Critical
> Labels: reliability
> Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives. If an
> exception is thrown after inFlightRequests.completeNext(source), then the
> corresponding RecordBatch's done will never get called, and
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case. First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21]
> (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught
> error in kafka producer I/O thread:
> java.lang.IllegalStateException: Correlation id for response (703766) does
> not match request (703764)
> at
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
> at
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
> at
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> client code
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)