[ 
https://issues.apache.org/jira/browse/KAFKA-14382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14382:
-------------------------------------------
    Description: 
One of the main improvements introduced by the cooperative protocol was the 
ability to continue processing records during a rebalance. In Streams, we take 
advantage of this by polling with a timeout of 0 when a rebalance is/has been 
in progress, so it can return immediately and continue on through the main loop 
to process new records. The main poll loop uses an algorithm based on the 
max.poll.interval.ms to ensure the StreamThread returns to call #poll in time 
to stay in the consumer group.

 

Generally speaking, it should exit the processing loop and invoke poll within a 
few minutes at most based on the poll interval, though typically it will break 
out much sooner once it's used up all the records from the last poll (based on 
the max.poll.records config which Streams sets to 1,000 by default). However, 
if doing heavy processing or setting a higher max.poll.records, the thread may 
continue processing for more than a few seconds. If it had sent out a JoinGroup 
request before going on to process and was waiting for its JoinGroup response, 
then once it does return to invoke #poll it will process this response and send 
out a SyncGroup – but if the processing took too long, this SyncGroup may 
immediately fail with the REBALANCE_IN_PROGRESS error.

 

Essentially, while the thread was processing the group leader will itself be 
processing the JoinGroup subscriptions of all members and generating an 
assignment, then sending this back in its SyncGroup. This may take only a few 
seconds or less, and the group coordinator will not yet have noticed (or care) 
that one of the consumers hasn't sent a SyncGroup – it will just return the 
assigned partitions in the SyncGroup request of the members who have responded 
in time, and "complete" the rebalance in their eyes. But if the assignment 
involved moving any partitions from one consumer to another, then it will need 
to trigger a followup rebalance right away to finish assigning those partitions 
which were revoked in the previous rebalance. This is what causes a new 
rebalance to be kicked off just seconds after the first one began.

 

If the consumer that was stuck processing was among those who needed to revoke 
partitions, this can lead to repeating rebalances – since it fails the 
SyncGroup of the 1st rebalance it never receives the assignment for it and 
never knows to revoke those partitions, meaning it will rejoin for the new 
rebalance still claiming them among its ownedPartitions. When the assignor 
generates the same assignment for the 2nd rebalance, it will again see that 
some partitions need to be revoked and will therefore trigger yet another new 
rebalance after finishing the 2nd. This can go on for as long as the 
StreamThreads are struggling to finish the JoinGroup phase in time due to 
processing.

 

Note that the best workaround at the moment is probably to just set a lower 
max.poll.records to reduce the processing loop duration

  was:
One of the main improvements introduced by the cooperative protocol was the 
ability to continue processing records during a rebalance. In Streams, we take 
advantage of this by polling with a timeout of 0 when a rebalance is/has been 
in progress, so it can return immediately and continue on through the main loop 
to process new records. The main poll loop uses an algorithm based on the 
max.poll.interval.ms to ensure the StreamThread returns to call #poll in time 
to stay in the consumer group.

 

Generally speaking, it should exit the processing loop and invoke poll within a 
few minutes at most based on the poll interval, though typically it will break 
out much sooner once it's used up all the records from the last poll (based on 
the max.poll.records config which Streams sets to 1,000 by default). However, 
if doing heavy processing or setting a higher max.poll.records, the thread may 
continue processing for more than a few seconds. If it had sent out a JoinGroup 
request before going on to process and was waiting for its JoinGroup response, 
then once it does return to invoke #poll it will process this response and send 
out a SyncGroup – but if the processing took too long, this SyncGroup may 
immediately fail with the REBALANCE_IN_PROGRESS error.

 

Essentially, while the thread was processing the group leader will itself be 
processing the JoinGroup subscriptions of all members and generating an 
assignment, then sending this back in its SyncGroup. This may take only a few 
seconds or less, and the group coordinator will not yet have noticed (or care) 
that one of the consumers hasn't sent a SyncGroup – it will just return the 
assigned partitions in the SyncGroup request of the members who have responded 
in time, and "complete" the rebalance in their eyes. But if the assignment 
involved moving any partitions from one consumer to another, then it will need 
to trigger a followup rebalance right away to finish assigning those partitions 
which were revoked in the previous rebalance. This is what causes a new 
rebalance to be kicked off just seconds after the first one began.

 

If the consumer that was stuck processing was among those who needed to revoke 
partitions, this can lead to repeating rebalances – since it fails the 
SyncGroup of the 1st rebalance it never receives the assignment for it and 
never knows to revoke those partitions, meaning it will rejoin for the new 
rebalance still claiming them among its ownedPartitions. When the assignor 
generates the same assignment for the 2nd rebalance, it will again see that 
some partitions need to be revoked and will therefore trigger yet another new 
rebalance after finishing the 2nd. This can go on for as long as the 
StreamThreads are struggling to finish the JoinGroup phase in time due to 
processing.


> StreamThreads can miss rebalance events when processing records during a 
> rebalance
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-14382
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14382
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: A. Sophie Blee-Goldman
>            Priority: Major
>
> One of the main improvements introduced by the cooperative protocol was the 
> ability to continue processing records during a rebalance. In Streams, we 
> take advantage of this by polling with a timeout of 0 when a rebalance is/has 
> been in progress, so it can return immediately and continue on through the 
> main loop to process new records. The main poll loop uses an algorithm based 
> on the max.poll.interval.ms to ensure the StreamThread returns to call #poll 
> in time to stay in the consumer group.
>  
> Generally speaking, it should exit the processing loop and invoke poll within 
> a few minutes at most based on the poll interval, though typically it will 
> break out much sooner once it's used up all the records from the last poll 
> (based on the max.poll.records config which Streams sets to 1,000 by 
> default). However, if doing heavy processing or setting a higher 
> max.poll.records, the thread may continue processing for more than a few 
> seconds. If it had sent out a JoinGroup request before going on to process 
> and was waiting for its JoinGroup response, then once it does return to 
> invoke #poll it will process this response and send out a SyncGroup – but if 
> the processing took too long, this SyncGroup may immediately fail with the 
> REBALANCE_IN_PROGRESS error.
>  
> Essentially, while the thread was processing the group leader will itself be 
> processing the JoinGroup subscriptions of all members and generating an 
> assignment, then sending this back in its SyncGroup. This may take only a few 
> seconds or less, and the group coordinator will not yet have noticed (or 
> care) that one of the consumers hasn't sent a SyncGroup – it will just return 
> the assigned partitions in the SyncGroup request of the members who have 
> responded in time, and "complete" the rebalance in their eyes. But if the 
> assignment involved moving any partitions from one consumer to another, then 
> it will need to trigger a followup rebalance right away to finish assigning 
> those partitions which were revoked in the previous rebalance. This is what 
> causes a new rebalance to be kicked off just seconds after the first one 
> began.
>  
> If the consumer that was stuck processing was among those who needed to 
> revoke partitions, this can lead to repeating rebalances – since it fails the 
> SyncGroup of the 1st rebalance it never receives the assignment for it and 
> never knows to revoke those partitions, meaning it will rejoin for the new 
> rebalance still claiming them among its ownedPartitions. When the assignor 
> generates the same assignment for the 2nd rebalance, it will again see that 
> some partitions need to be revoked and will therefore trigger yet another new 
> rebalance after finishing the 2nd. This can go on for as long as the 
> StreamThreads are struggling to finish the JoinGroup phase in time due to 
> processing.
>  
> Note that the best workaround at the moment is probably to just set a lower 
> max.poll.records to reduce the processing loop duration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to