[ 
https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17246677#comment-17246677
 ] 

Bruno Cadonna commented on KAFKA-10772:
---------------------------------------

I think, I found the bug. If a static member joins the group it gets a member 
ID that is used to store the assignment for the static member broker side. A 
static member gets a new member ID each time it joins the group. That means, if 
a static member joins a group and gets a member ID that is used during the next 
assignment but before the assignment is broadcasted to the members in the 
SyncGroup response the same static member joins the group again (e.g. Streams 
client restart) and gets a new member ID, the group coordinator will not find 
the new member ID in the assignment and will store an empty assignment (i.e. an 
empty byte buffer) for the new member ID. Hence, the static member will get an 
empty assignment and throw the above {{IllegalStateException}}.

For example, assume the following static members:
* member A with group instance ID A and initial member ID 1
* member B with group instance ID B and initial member ID 2

The group leader will send the following assignment to the group coordinator:
1 -> some assigned partitions
2 -> some other assigned partitions

Before the group coordinator gets this assignment and broadcast it, member A 
rejoins and get member ID 3. The group coordinator will store the following 
assignment:
2 -> some other assigned partitions
3 -> empty buffer

Member A that now has member ID 3 will receive the empty buffer as its 
assignment.

I could reproduce the issue with 2.4.1 brokers, but not with 2.6 brokers.   

> java.lang.IllegalStateException: There are insufficient bytes available to 
> read assignment from the sync-group response (actual byte size 0)
> --------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10772
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10772
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Levani Kokhreidze
>            Assignee: Bruno Cadonna
>            Priority: Blocker
>         Attachments: KAFKA-10772.log
>
>
> From time to time we encounter the following exception that results in Kafka 
> Streams threads dying.
> Broker version 2.4.1, Client version 2.6.0
> {code:java}
> Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | 
> stream-client [cluster1-profile-stats-pipeline-client-id] State transition 
> from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | 
> streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] 
> State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app 
> service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- 
> [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream 
> processing pipeline: [profile-stats] encountered unrecoverable exception. 
> Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is 
> completely dead. If all worker threads die, Kafka Streams will be moved to 
> permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | 
> streaming-app-2 | Stream processing pipeline: [profile-stats] encountered 
> unrecoverable exception. Thread: 
> [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely 
> dead. If all worker threads die, Kafka Streams will be moved to permanent 
> ERROR state. java.lang.IllegalStateException: There are insufficient bytes 
> available to read assignment from the sync-group response (actual byte size 
> 0) , this is not expected; it is possible that the leader's assign function 
> is buggy and did not return any assignment for this member, or because static 
> member is configured and the protocol is buggy hence did not get the 
> assignment for this member at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to