[ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303621#comment-17303621
 ] 

Guozhang Wang edited comment on KAFKA-12472 at 3/17/21, 6:31 PM:
-----------------------------------------------------------------

Good questions. I'd like to dump some of my rough reasoning here, and we can 
iterate on the precedence ordering.

* As a rule of thumb, I think *CoordinatorRequested* itself should only be a 
root cause of a rebalance when broker proactively kicked a member and notify 
everyone else to re-join (note if the member's hb itself realized it has 
expired, and hence issue leave-group, it should be a different *DroppedGroup*), 
which would be a very rare case. For example, let's say memberA newly joined 
and sets its state to *NewMember*, while another memberB triggering a rebalance 
after generation 1 has just formed but assignment not completed (say it is due 
to *DroppedGroup*), memberA sees rebalance in progress in sync response and 
still remains in *NewMember* at new generation 2. The group's aggregated 
rebalance state should be at *NewMember* during generation1 and then at 
*DroppedGroup* at generation2. As for individual members, I think it is okay to 
let memberA remain in *NewMember* at generation2 since from its perspective, it 
is sort of "I tried to join the group and complete rebalance as a new member, 
and until I've finally completed a successful rebalance I'm just considering 
myself stuck in this status". On the other hand, say if generation1 actually 
completes and then memberB triggers, even just 10ms right after, the memberA 
would still first transit back to *None* and then to *CoordinatorRequested*. 
The aggregated group state remains a similar history: first *NewMember*, then 
very briefly *None* for a few milliseconds, and then *DroppedGroup*. NOTE that 
in practice, since we would not poll / aggregate metrics every milliseconds, we 
would likely miss the transient *NONE* state at the group level, which would 
leave us the same history as if generation1 never completes before it moves to 
generation2. From operator's pov this should be okay.
* Now, what about *DroppedGroup* triggers first by memberB and then a new 
memberA joins? I.e. why giving *DroppedGroup* a higher precedence so that in 
this case, it would not transit back to *NewMember* if generation1 does not 
complete and immediately turns to generation2. Here my only motivation is that 
DroppedGroup is probably more severe than NewMember from a trouble shooting 
perspective -- I know this is rather bold, and I'm welcoming to feedbacks. I'm 
also open to say, we do not enforce strict precedence ordering among states 
transition for a single consumer member, but allow it to transit to any states; 
just that when aggregating on per-group (streams-app) / per-streams-client / 
per-query (this is something specific to ksql any probably does not need to be 
discussed in AK, but just list here), we rely on the precedence ordering.
* Finally, for Streams extended states, I have similar reasoning that if a 
rebalance in generation1 did not complete and we immediately move to the next 
generation, it is better to keep the original generation's reason from a higher 
Streams state than transit to a lower state (e.g. like *AssignmentProbing* --> 
*DroppedGroup*) since from trouble-shooting perspective we'd like to keep the 
most severe root cause. But similarly I admit this can be done by allowing 
single consumer to transit arbitrarily while only enforce precedence ordering 
at aggregation rules.

LMK what do you think.


was (Author: guozhang):
Good questions. I'd like to dump some of my rough reasoning here, and we can 
iterate on the precedence ordering.

* As a rule of thumb, I think *CoordinatorRequested* itself should only be a 
root cause of a rebalance when broker proactively kicked a member and notify 
everyone else to re-join (note if the member's hb itself realized it has 
expired, and hence issue leave-group, it should be a different *DroppedGroup*), 
which would be a very rare case. For example, let's say memberA newly joined 
and sets its state to *NewMember*, while another memberB triggering a rebalance 
after generation 1 has just formed but assignment not completed (say it is due 
to *DroppedGroup*), memberA sees rebalance in progress in sync response and 
still remains in *NewMember* at new generation 2. The group's aggregated 
rebalance state should be at *NewMember* during generation1 and then at 
*DroppedGroup* at generation2. As for individual members, I think it is okay to 
let memberA remain in *NewMember* at generation2 since from its perspective, it 
is sort of "I tried to join the group and complete rebalance as a new member, 
and until I've finally completed a successful rebalance I'm just considering 
myself stuck in this status". On the other hand, say if generation1 actually 
completes and then memberB triggers, even just 10ms right after, the memberA 
would still first transit back to *None* and then to *CoordinatorRequested*. 
The aggregated group state remains a similar history: first *NewMember*, then 
very briefly *None* for a few milliseconds, and then *DroppedGroup*.
* Now, what about *DroppedGroup* triggers first by memberB and then a new 
memberA joins? I.e. why giving *DroppedGroup* a higher precedence so that in 
this case, it would not transit back to *NewMember* if generation1 does not 
complete and immediately turns to generation2. Here my only motivation is that 
DroppedGroup is probably more severe than NewMember from a trouble shooting 
perspective -- I know this is rather bold, and I'm welcoming to feedbacks. I'm 
also open to say, we do not enforce strict precedence ordering among states 
transition for a single consumer member, but allow it to transit to any states; 
just that when aggregating on per-group (streams-app) / per-streams-client / 
per-query (this is something specific to ksql any probably does not need to be 
discussed in AK, but just list here), we rely on the precedence ordering.
* Finally, for Streams extended states, I have similar reasoning that if a 
rebalance in generation1 did not complete and we immediately move to the next 
generation, it is better to keep the original generation's reason from a higher 
Streams state than transit to a lower state (e.g. like *AssignmentProbing* --> 
*DroppedGroup*) since from trouble-shooting perspective we'd like to keep the 
most severe root cause. But similarly I admit this can be done by allowing 
single consumer to transit arbitrarily while only enforce precedence ordering 
at aggregation rules.

LMK what do you think.

> Add a Consumer / Streams metric to indicate the current rebalance status
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-12472
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12472
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer, streams
>            Reporter: Guozhang Wang
>            Priority: Major
>              Labels: needs-kip
>
> Today to trouble shoot a rebalance issue operators need to do a lot of manual 
> steps: locating the problematic members, search in the log entries, and look 
> for related metrics. It would be great to add a single metric that covers all 
> these manual steps and operators would only need to check this single signal 
> to check what is the root cause. A concrete idea is to expose two enum gauge 
> metrics on consumer and streams, respectively:
> * Consumer level (the order below is by-design, see Streams level for 
> details):
>   0. *None* => there is no rebalance on going.
>   1. *CoordinatorRequested* => any of the coordinator response contains a 
> RebalanceInProgress error code.
>   2. *NewMember* => when the join group response has a MemberIdRequired error 
> code.
>   3. *UnknownMember* => when any of the coordinator response contains an 
> UnknownMember error code, indicating this member is already kicked out of the 
> group.
>   4. *StaleMember* => when any of the coordinator response contains an 
> IllegalGeneration error code.
>   5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
> expired.
>   6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll 
> API, as well as upon calling the enforceRebalance API.
>   7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
>   8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
> changed.
>   9. *RetryOnError* => when join/syncGroup response contains a retriable 
> error which would cause the consumer to backoff and retry.
>  10. *RevocationNeeded* => requestRejoin triggered since revoked partitions 
> is not empty.
> The transition rule is that a non-zero status code can only transit to zero 
> or to a higher code, but not to a lower code (same for streams, see 
> rationales below).
> * Streams level: today a streams client can have multiple consumers. We 
> introduced some new enum states as well as aggregation rules across 
> consumers: if there's no streams-layer events as below that transits its 
> status (i.e. streams layer think it is 0), then we aggregate across all the 
> embedded consumers and take the largest status code value as the streams 
> metric; if there are streams-layer events that determines its status should 
> be in 10+, then it ignores all embedded consumer layer status code since it 
> should has a higher precedence. In addition, when create aggregated metric 
> across streams instance (a.k.a at the app-level, which is usually what we 
> would care and alert on), we also follow the same aggregation rule, e.g. if 
> there are two streams instance where one instance's status code is 1), and 
> the other is 10), then the app's status is 10).
>  10. *RevocationNeeded* => the definition of this is changed to the original 
> 10) defined in consumer above, OR leader decides to revoke either 
> active/standby tasks and hence schedule follow-ups.
>  11. *AssignmentProbing* => leader decides to schedule follow-ups since the 
> current assignment is unstable.
>  12. *VersionProbing* => leader decides to schedule follow-ups due to version 
> probing.
>  13. *EndpointUpdate* => anyone decides to schedule follow-ups due to 
> endpoint updates.
> The main motivations of the above proposed precedence order are the following:
> 1. When a rebalance is triggered by one member, all other members would only 
> know it is due to CoordinatorRequested from coordinator error codes, and 
> hence CoordinatorRequested should be overridden by any other status when 
> aggregating across clients.
> 2. DroppedGroup could cause unknown/stale members that would fail and retry 
> immediately, and hence should take higher precedence.
> 3. Revocation definition is extended in Streams, and hence it needs to take 
> the highest precedence among all consumer-only status so that it would not be 
> overridden by any of the consumer-only status.
> 4. In general, more rare events get higher precedence.
> This is proposed on top of KAFKA-12352. Any comments on the precedence rules 
> / categorization are more than welcomed!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to