A. Sophie Blee-Goldman created KAFKA-14382:
----------------------------------------------

             Summary: StreamThreads can miss rebalance events when processing 
records during a rebalance
                 Key: KAFKA-14382
                 URL: https://issues.apache.org/jira/browse/KAFKA-14382
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: A. Sophie Blee-Goldman


One of the main improvements introduced by the cooperative protocol was the 
ability to continue processing records during a rebalance. In Streams, we take 
advantage of this by polling with a timeout of 0 when a rebalance is/has been 
in progress, so it can return immediately and continue on through the main loop 
to process new records. The main poll loop uses an algorithm based on the 
max.poll.interval.ms to ensure the StreamThread returns to call #poll in time 
to stay in the consumer group.

 

Generally speaking, it should exit the processing loop and invoke poll within a 
few minutes at most based on the poll interval, though typically it will break 
out much sooner once it's used up all the records from the last poll (based on 
the max.poll.records config which Streams sets to 1,000 by default). However, 
if doing heavy processing or setting a higher max.poll.records, the thread may 
continue processing for more than a few seconds. If it had sent out a JoinGroup 
request before going on to process and was waiting for its JoinGroup response, 
then once it does return to invoke #poll it will process this response and send 
out a SyncGroup – but if the processing took too long, this SyncGroup may 
immediately fail with the REBALANCE_IN_PROGRESS error.

 

Essentially, while the thread was processing the group leader will itself be 
processing the JoinGroup subscriptions of all members and generating an 
assignment, then sending this back in its SyncGroup. This may take only a few 
seconds or less, and the group coordinator will not yet have noticed (or care) 
that one of the consumers hasn't sent a SyncGroup – it will just return the 
assigned partitions in the SyncGroup request of the members who have responded 
in time, and "complete" the rebalance in their eyes. But if the assignment 
involved moving any partitions from one consumer to another, then it will need 
to trigger a followup rebalance right away to finish assigning those partitions 
which were revoked in the previous rebalance. This is what causes a new 
rebalance to be kicked off just seconds after the first one began.

 

If the consumer that was stuck processing was among those who needed to revoke 
partitions, this can lead to repeating rebalances – since it fails the 
SyncGroup of the 1st rebalance it never receives the assignment for it and 
never knows to revoke those partitions, meaning it will rejoin for the new 
rebalance still claiming them among its ownedPartitions. When the assignor 
generates the same assignment for the 2nd rebalance, it will again see that 
some partitions need to be revoked and will therefore trigger yet another new 
rebalance after finishing the 2nd. This can go on for as long as the 
StreamThreads are struggling to finish the JoinGroup phase in time due to 
processing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to