[
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991488#comment-15991488
]
Shannon Carey commented on KAFKA-4669:
--------------------------------------
We encountered this exception with our 0.9.0.1 client too, but it was preceded
by another exception:
{code}
2017-05-01 10:38:02:866 thread=kafka-producer-network-thread | producer-1,
level=ERROR, logger=org.apache.kafka.clients.producer.internals.Sender, ,
message="Uncaught error in kafka producer I/O thread:
"org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'brokers': Error reading field 'host': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
2017-05-01 10:38:04:847 thread=kafka-producer-network-thread | producer-1,
level=ERROR, logger=org.apache.kafka.clients.producer.internals.Sender, , me
ssage="Uncaught error in kafka producer I/O thread: "
java.lang.IllegalStateException: Correlation id for response (3623413) does not
match request (3623406)
at
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
{code}
It seems like we will need to use metrics in order to detect this situation and
react by restarting the application, assuming there's no workaround.
The producer should probably be fixed so that any unhandled exceptions that
occur in the Kafka internal threads result in an exception thrown to the user
code so that user code can react to it instead of silently ceasing to work. Or,
if it's a transient problem, the thread should recover.
> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws
> exception
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.9.0.1
> Reporter: Cheng Ju
> Priority: Critical
> Labels: reliability
> Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives. If an
> exception is thrown after inFlightRequests.completeNext(source), then the
> corresponding RecordBatch's done will never get called, and
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case. First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21]
> (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught
> error in kafka producer I/O thread:
> java.lang.IllegalStateException: Correlation id for response (703766) does
> not match request (703764)
> at
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
> at
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
> at
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> client code
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)