Evan Pollan created KAFKA-5875:
----------------------------------

             Summary: Consumer group repeatedly fails to join, even across JVM 
restarts: BufferUnderFlowException reading the {{version}} field in the 
consumer protocol header
                 Key: KAFKA-5875
                 URL: https://issues.apache.org/jira/browse/KAFKA-5875
             Project: Kafka
          Issue Type: Bug
            Reporter: Evan Pollan


I've seen this maybe once a month in our large cluster Kubernetes-based Kafka 
consumers & producers.  Every once in a while a consumer in a Kubernetes "pod" 
get this error trying to join a consumer group:

{code}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka
 version : 0.11.0.0","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka
 commitId : cb8625948210849f","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","message":"Revoking
 previously assigned partitions [] for group 
conv-fetch-jobs-runner-for-internal","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"(Re-)joining
 group conv-fetch-jobs-runner-for-internal","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:43.588+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"Successfully
 joined group conv-fetch-jobs-runner-for-internal with generation 
17297","exception":""}
{"errorType":"Error reading field 'version': 
java.nio.BufferUnderflowException","level":"ERROR","message":"Died!","operation":"Died!","stacktrace":"org.apache.kafka.common.protocol.types.SchemaException:
 Error reading field 'version': java.nio.BufferUnderflowException\n\tat 
org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)\n\tat 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)\n\tat
 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)\n\tat
 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)\n\tat
 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)\n\tat
 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)\n\tat
 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)\n\tat
 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)\n\tat
 
com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)\n\tat
 
com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)\n\tat
 
com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)\n\tat
 
com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)\n\tat
 
java.lang.Thread.run(Thread.java:745)\n","trackingId":"dead-consumer","logger":"com.spredfast.common.kafka.consumer.RunnableConsumer","loggingVersion":"UNKNOWN","component":"MetricsFetch","pid":25,"host":"fetch-2420457278-sh4f5","@timestamp":"2017-09-12T13:45:43.613Z"}
{code}

Pardon the log format -- these get sucked into logstash, thus the JSON.

Here's the raw stacktrace: 
{code}
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'version': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
        at 
com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)
        at 
com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)
        at 
com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)
        at 
com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)
        at java.lang.Thread.run(Thread.java:745)
{code}

What's fascinating about this is:
* We have a liveness probe (Kubernetes term for a healthcheck whose failure 
will cause the container backing the "pod" to be killed and restarted) attached 
to the existence of dead consumers.  When this situation happens, it _never_ 
resolves itself. Today, I found a pod that had been restarted 1023 times due to 
this error.
* The only way to make it go away is to _delete_ the Kubernetes pod.  This 
causes it to be replaced by a pod on another Kubernetes host ("minion") using 
the same docker image and configuration.  Invariably, this pod comes up and all 
consumers join just fine
* We have _not_ tried restarting the brokers when this happens.  
* There must be something about the pod, container, or Kubernetes host that is 
consistent across pod crash loops that factors into the consumer group join 
process -- MAC?  hostname?  Can't be anything that is recomputed on JVM 
restart, though...

Seems like there's either:
# a bug in the client (i.e. in its assumption that it can deserialize a 
protocol header on successful return of that join future).  maybe there's a 
flavor of broker response that doesn't include this header?
# a bug in the broker in that it's sending an empty or undersized response to a 
group join command in some situtations.

It's worth noting that the severity of this issue is magnified by the fact that 
it requires manual intervention.  It wouldn't be so bad if our healthcheck 
failure tripped a pod restart, and the new JVM's consumers would join OK.  But, 
the fact that even a JVM restart doesn't do it means most resiliency plays 
won't work.

BTW, I see a similar schema read failure in 
https://issues.apache.org/jira/browse/KAFKA-4349, although the client code is 
completely different (admin {{ConsumerGroupCommand}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to