[
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cheng Ju updated KAFKA-4669:
----------------------------
Description:
There is no try catch in NetworkClient.handleCompletedReceives. If an
exception is thrown after inFlightRequests.completeNext(source), then the
corresponding RecordBatch's done will never get called, and KafkaProducer.flush
will hang on this RecordBatch.
I've checked 0.10 code and think this bug does exist in 0.10 versions.
A real case. First a correlateId not match exception happens:
13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21]
(org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught error
in kafka producer I/O thread:
java.lang.IllegalStateException: Correlation id for response (703766) does not
match request (703764)
at
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)
Then jstack shows the thread is hanging on:
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
at
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
client code
was:
There is no try catch in NetworkClient.handleCompletedReceives. If an
exception is thrown after inFlightRequests.completeNext(source), then the
corresponding RecordBatch's done will never get called, and KafkaProducer.flush
will hang on this RecordBatch.
I've checked 0.10 code and think this bug does exist in 0.10 versions.
> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws
> exception
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.9.0.1
> Reporter: Cheng Ju
>
> There is no try catch in NetworkClient.handleCompletedReceives. If an
> exception is thrown after inFlightRequests.completeNext(source), then the
> corresponding RecordBatch's done will never get called, and
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case. First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21]
> (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught
> error in kafka producer I/O thread:
> java.lang.IllegalStateException: Correlation id for response (703766) does
> not match request (703764)
> at
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
> at
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
> at
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> client code
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)