[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401736#comment-17401736
 ] 

BurningIce commented on KAFKA-4669:
-----------------------------------


I'm using kafka-clients 2.5.0, also encountering this issue at consumer-side.

I turned on the trace level for kafka log after the error occured (not before)
It seems that the correlation-id in the previous log "Using older server API v7 
to send OFFSET_COMMIT ... with correlation id" does not match with the one 
appeared in the exception message.

Will the logs help?
[~rsivaram] 
============================

 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] DEBUG o.a.k.c.c.i.ConsumerCoordinator - 
[Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] 
Sending asynchronous auto-commit of offsets 
{q-app-jvm-1=OffsetAndMetadata{offset=13500194, leaderEpoch=5, metadata=''}}
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.ConsumerCoordinator - 
[Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] 
Sending OffsetCommit request with 
{q-app-jvm-1=OffsetAndMetadata{offset=13500194, leaderEpoch=5, metadata=''}} to 
coordinator apmkafka10:9092 (id: 2147483643 rack: null)
08-19 23:29:13 [pool-21-thread-1] DEBUG o.a.k.c.NetworkClient - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Using older 
server API v7 to send OFFSET_COMMIT 
{group_id=cg-apm-dc-wrap-jvm,generation_id=222,member_id=consumer-cg-apm-dc-wrap-jvm-1-c9eec325-434f-4495-b040-7522d5c2ec87,group_instance_id=null,topics=[{name=q-app-jvm,partitions=[{partition_index=1,committed_offset=13500194,committed_leader_epoch=5,committed_metadata=}]}]}
 with correlation id 13993458 to node 2147483643
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] ERROR c.n.m.k.KafkaTopicConsumer - unexpected 
error to get message from kafka: Correlation id for response (13993458) does 
not match request (13993456), request header: 
RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=7, 
clientId=consumer-cg-apm-dc-wrap-jvm-1, correlationId=13993456)
java.lang.IllegalStateException: Correlation id for response (13993458) does 
not match request (13993456), request header: 
RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=7, 
clientId=consumer-cg-apm-dc-wrap-jvm-1, correlationId=13993456)
        at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:943) 
~[kafka-clients-2.5.0.jar:?]
        at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:726)
 ~[kafka-clients-2.5.0.jar:?]
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:839)
 ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558) 
~[kafka-clients-2.5.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
 ~[kafka-clients-2.5.0.jar:?]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
 ~[kafka-clients-2.5.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308)
 ~[kafka-clients-2.5.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) 
~[kafka-clients-2.5.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
~[kafka-clients-2.5.0.jar:?]
        at 
com.networkbench.mq.kafka.KafkaTopicConsumer$MQConsumerWorker.run(KafkaTopicConsumer.java:372)
 [networkbench-mq-2.3.9-kafka_2.0-SNAPSHOT.jar:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_232]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_232]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4669
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4669
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1
>            Reporter: Cheng Ju
>            Assignee: Rajini Sivaram
>            Priority: Critical
>              Labels: reliability
>             Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>       at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>       at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>       at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>       at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>       at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>       at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to