[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-9752:
-----------------------------------
    Description: 
For older versions of the JoinGroup protocol (v3 and below), there was no way 
for new consumer group members to get their memberId until the first rebalance 
completed. If the JoinGroup request timed out and the client disconnected, the 
member would nevertheless be left in the group until the rebalance completed 
and the session timeout expired. 

In order to prevent this situation from causing the group size to grow 
unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
member will be left in the group before it would be kicked out (in spite of 
rebalance state). 

In KAFKA-9232, we addressed one issue with this solution. Basically the new 
member expiration logic did not properly get cancelled after the rebalance 
completed which means that in certain cases, a successfully joined member might 
get kicked out of the group unnecessarily. 

Unfortunately, this patch introduced a regression in the normal session 
expiration logic following completion of the initial rebalance. Basically the 
expiration task fails to get scheduled properly. The issue is in this function:

{code}
  def shouldKeepAlive(deadlineMs: Long): Boolean = {
    if (isNew) {
      // New members are expired after the static join timeout
      latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
    } else if (isAwaitingJoin || isAwaitingSync) {
      // Don't remove members as long as they have a request in purgatory
      true
    } else {
      // Otherwise check for session expiration
      latestHeartbeat + sessionTimeoutMs > deadlineMs
    }
  }
{code}

We use this logic in order to check for session expiration. On the surface, 
there is nothing wrong with it, but it has an odd interaction with the 
purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
the code relies on `shouldKeepAlive` returning false so that the heartbeat task 
is not immediately completed. This only works because we update 
`latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
that the first or third checks will fail, `shouldKeepAlive` will return false, 
and the heartbeat expiration task will not be immediately completed. 

The bug in this case has to do with the case when `isNew` is true. When we 
schedule the session expiration task, the `isNew` flag is still set to true, 
which means we will hit the first check above. Since in most cases, the session 
timeout is less than the new member timeout of 5 minutes, the check is very 
likely to return true. This seems like what we would want, but as noted above, 
we rely on this function returning false when the expiration task is passed to 
`tryCompleteElseWatch`. Since it returns true instead, the task completes 
immediately, which means we cannot rely on its expiration.

The impact of this bug in the worst case is that a consumer group can be left 
in the `PreparingRebalance` state indefinitely. This state will persist until 
there is a coordinator change (e.g. as a result of restarting the broker). Note 
that this is only possible if 1) we have a consumer using an old JoinGroup 
version, 2) the consumer times out and disconnects from its initial JoinGroup 
request. 




  was:
For older versions of the JoinGroup protocol, there was no way for new consumer 
group members to get their memberId until the first rebalance completed. If the 
JoinGroup request timed out and the client disconnected, the member would 
nevertheless be left in the group until the rebalance completed and the session 
timeout expired. 

In order to prevent this situation from causing the group size to grow 
unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
member will be left in the group before it would be kicked out (in spite of 
rebalance state). 

In KAFKA-9232, we addressed one issue with this solution. Basically the new 
member expiration logic did not properly get cancelled after the rebalance 
completed which means that in certain cases, a successfully joined member might 
get kicked out of the group unnecessarily. 

Unfortunately, this patch introduced a regression in the normal session 
expiration logic following completion of the initial rebalance. Basically the 
expiration task fails to get scheduled properly. The issue is in this function:

{code}
  def shouldKeepAlive(deadlineMs: Long): Boolean = {
    if (isNew) {
      // New members are expired after the static join timeout
      latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
    } else if (isAwaitingJoin || isAwaitingSync) {
      // Don't remove members as long as they have a request in purgatory
      true
    } else {
      // Otherwise check for session expiration
      latestHeartbeat + sessionTimeoutMs > deadlineMs
    }
  }
{code}

We use this logic in order to check for session expiration. On the surface, 
there is nothing wrong with it, but it has an odd interaction with the 
purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
the code relies on `shouldKeepAlive` returning false so that the heartbeat task 
is not immediately completed. This only works because we update 
`latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
that the first or third checks will fail, `shouldKeepAlive` will return false, 
and the heartbeat expiration task will not be immediately completed. 

The bug in this case has to do with the case when `isNew` is true. When we 
schedule the session expiration task, the `isNew` flag is still set to true, 
which means we will hit the first check above. Since in most cases, the session 
timeout is less than the new member timeout of 5 minutes, the check is very 
likely to return true. This seems like what we would want, but as noted above, 
we rely on this function returning false when the expiration task is passed to 
`tryCompleteElseWatch`. Since it returns true instead, the task completes 
immediately, which means we cannot rely on its expiration.

The impact of this bug in the worst case is that a consumer group can be left 
in the `PreparingRebalance` state indefinitely. This state will persist until 
there is a coordinator change (e.g. as a result of restarting the broker). Note 
that this is only possible if 1) we have a consumer using an old JoinGroup 
version, 2) the consumer times out and disconnects from its initial JoinGroup 
request. 





> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-9752
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9752
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.2.2, 2.3.1, 2.4.1
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Blocker
>             Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol (v3 and below), there was no way 
> for new consumer group members to get their memberId until the first 
> rebalance completed. If the JoinGroup request timed out and the client 
> disconnected, the member would nevertheless be left in the group until the 
> rebalance completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
>     if (isNew) {
>       // New members are expired after the static join timeout
>       latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
>     } else if (isAwaitingJoin || isAwaitingSync) {
>       // Don't remove members as long as they have a request in purgatory
>       true
>     } else {
>       // Otherwise check for session expiration
>       latestHeartbeat + sessionTimeoutMs > deadlineMs
>     }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist until 
> there is a coordinator change (e.g. as a result of restarting the broker). 
> Note that this is only possible if 1) we have a consumer using an old 
> JoinGroup version, 2) the consumer times out and disconnects from its initial 
> JoinGroup request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to