Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lucasbru merged PR #15415: URL: https://github.com/apache/kafka/pull/15415 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on PR #15415: URL: https://github.com/apache/kafka/pull/15415#issuecomment-1961554165 Closing and re-opening PR to trigger build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm closed pull request #15415: KAFKA-16258: callback to release assignment when stale member leaves group URL: https://github.com/apache/kafka/pull/15415 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on PR #15415: URL: https://github.com/apache/kafka/pull/15415#issuecomment-1961481081 Changes done @lucasbru, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on code in PR #15415: URL: https://github.com/apache/kafka/pull/15415#discussion_r1500782762 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -132,11 +132,20 @@ public enum MemberState { PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING, UNSUBSCRIBED, FENCED); -LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING); +// Transition from prepare leaving to leaving is the expected one in all close operations +// except for when the poll timer expires (ex. leave group due to unsubscribe or consumer +// close, where member triggers callbacks first while it continues sending heartbeat +// (PREPARE_LEAVE state) and then sends the heartbeat to leave (LEAVING state). +// All other transitions directly to LEAVING are expected when the member leaves due to +// expired poll timer. In that case, the member sends the heartbeat to leave first, and +// then invokes callbacks to release assignment while STALE, not sending any more +// heartbeats while STALE because it has been already removed from the group on the broker. +LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING, RECONCILING, Review Comment: I see, I was hesitating about forcing that transition with a `transitionToPrepareLeaving` and also changing the prepare-leaving concept, but totally get the gain of reusing the state, and thinking of it slightly different convinced me: I reused the state, but not exposed at the HBMgr level, but internally in the membershipMgr. So we do simplify all those transitions, we still keep a single `membershipManager.transitionToSendingLeaveGroup(dueToPollTimerExpired)` in the HBMgr, and internally in its implementation we do the transient transition through prepare_leaving if poll timer expired. What do you think? Nice result I believe. Thanks for the very helpful feedback! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lucasbru commented on PR #15415: URL: https://github.com/apache/kafka/pull/15415#issuecomment-1961282993 Let me know if you are planning to change something. Otherwise, I think we can merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lucasbru commented on code in PR #15415: URL: https://github.com/apache/kafka/pull/15415#discussion_r1500621907 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -132,11 +132,20 @@ public enum MemberState { PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING, UNSUBSCRIBED, FENCED); -LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING); +// Transition from prepare leaving to leaving is the expected one in all close operations +// except for when the poll timer expires (ex. leave group due to unsubscribe or consumer +// close, where member triggers callbacks first while it continues sending heartbeat +// (PREPARE_LEAVE state) and then sends the heartbeat to leave (LEAVING state). +// All other transitions directly to LEAVING are expected when the member leaves due to +// expired poll timer. In that case, the member sends the heartbeat to leave first, and +// then invokes callbacks to release assignment while STALE, not sending any more +// heartbeats while STALE because it has been already removed from the group on the broker. +LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING, RECONCILING, Review Comment: What I meant was, just do `transitionToPrepareLeaving(); transitionToLeaving()`, to simplify the state machine. Then, whether we heartbeat or not would not matter. Just use an empty preparation phase, instead of skipping it entirely. But all of these are just ideas, I'll leave it to you to decide whether it's worth it or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on PR #15415: URL: https://github.com/apache/kafka/pull/15415#issuecomment-1960700348 Merged trunked latest changes fixing build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on PR #15415: URL: https://github.com/apache/kafka/pull/15415#issuecomment-1960329387 Hey @lucasbru. Great push for simplification, went as far as I could see unifying the leave operations, but still living with the fact that the expired timer brings up a leave path that's quite different from the existing one. Thanks for the comments, all addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on code in PR #15415: URL: https://github.com/apache/kafka/pull/15415#discussion_r1499941957 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -564,7 +587,7 @@ public void transitionToJoining() { */ @Override public CompletableFuture leaveGroup() { -if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { +if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || state == MemberState.STALE) { Review Comment: Filed https://issues.apache.org/jira/browse/KAFKA-16301 for reviewing the callback interactions in edge cases of fence+unsubscribe+subscribe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on code in PR #15415: URL: https://github.com/apache/kafka/pull/15415#discussion_r1499920005 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -132,11 +132,20 @@ public enum MemberState { PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING, UNSUBSCRIBED, FENCED); -LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING); +// Transition from prepare leaving to leaving is the expected one in all close operations +// except for when the poll timer expires (ex. leave group due to unsubscribe or consumer +// close, where member triggers callbacks first while it continues sending heartbeat +// (PREPARE_LEAVE state) and then sends the heartbeat to leave (LEAVING state). +// All other transitions directly to LEAVING are expected when the member leaves due to +// expired poll timer. In that case, the member sends the heartbeat to leave first, and +// then invokes callbacks to release assignment while STALE, not sending any more +// heartbeats while STALE because it has been already removed from the group on the broker. +LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING, RECONCILING, Review Comment: I would say we shouldn't transition through prepare_leaving because it represents a member that is still part of the group, which is not the case in this situation where the first thing is to send the leave group and then everything else. This means that the prepare-leaving continues to send HB for instance, and needs to be considered in all the interactions that could happen while HBeating. Of course we could attempt to reuse the state, but would probably be confusing being preparing to leave when we already sent the leave group, and would complicate the now clear decision to HB while prepare leaving and not HB while stale. The fundamental difference in sequence of the 2 kind of leave makes it tricky to reuse the whole path: regular leave need to prepare to leave and then leave. The expiration leave needs to leave first, and then go to a state where it needs to do different things than the prepare leave: it needs to stop HB, release assignment (this is the only similar to prepare leaving), wait for timer reset, and then rejoin. All different needs than the prepare, except for the release assignment. Most of these transitions existed before btw (as previous to STALE), they just moved places because we're reusing the LEAVING now. So in that sense the change is not introducing lots to the state machine, just moving transitions to reuse the existing LEAVING. With the change [8440b3a](https://github.com/apache/kafka/pull/15415/commits/8440b3a405522037f1d7387289c0f2d9271c76cf) for a unified leave I think it's better, and I did remove the comment since the intention is clearer with the leave group change now I believe. Please take a look and let me know if it aligns better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on code in PR #15415: URL: https://github.com/apache/kafka/pull/15415#discussion_r1499894219 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -215,6 +216,7 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { } NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false); +membershipManager.onHeartbeatRequestSent(); Review Comment: yes, makes sense, done. I had considered it but wasn't convinced thinking that it would complicate the leaving on the membershipMgr, but with your comment I took another round and it does seem better actually, just a single leaving, that in some cases is due to poll timer expired (and that allows the manager to make the right transitions after leaving). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on code in PR #15415: URL: https://github.com/apache/kafka/pull/15415#discussion_r1499559758 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -744,16 +767,34 @@ public boolean isLeavingGroup() { return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING; } +@Override +public void maybeRejoinStaleMember() { +if (state == MemberState.STALE) { +staleMemberAssignmentRelease.whenComplete((__, error) -> transitionToJoining()); Review Comment: Sure, good catch, done. We truly only need it when an expired poll timer is reset. Note that I added the check in the HB (where I'm trying to keep the poll timer logic), and the membershipMgr is only concern about the stale situation (which is more than just the timer concept, because it includes the callbacks that may take a while) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on code in PR #15415: URL: https://github.com/apache/kafka/pull/15415#discussion_r1499490700 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -564,7 +587,7 @@ public void transitionToJoining() { */ @Override public CompletableFuture leaveGroup() { -if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { +if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || state == MemberState.STALE) { Review Comment: Good point, I think we could reuse it and I think it's a good direction. It does require taking care of some other details: we need to make sure that when the fencing callbacks complete, we don't blindly transition to joining (which is the current shape). And also, when using `notInGroup` here for a no-op leave, the fencing case is a bit different that then others, because we do need to transition to UNSUBSCRIBED (other states remain unchanged if a leave group call happens). All done. This suggestion also makes me think about another tricky situation, that I will leave to review separately if that's ok. I guess we could get a leave group while fenced, then the user subscribes again, and the member will rejoin, while the onPartitionsLost may have not completed yet. Only concern is that we could get an assignment and end up running onPartitionsAssigned while the onPartitionsLost is still running. That's not the behaviour in the old consumer (callbacks execution blocks in the coordinator). I will file a jira to review it better, as it would definitely complicate things more (as callbacks always do). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lucasbru commented on code in PR #15415: URL: https://github.com/apache/kafka/pull/15415#discussion_r1499180116 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -215,6 +216,7 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { } NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false); +membershipManager.onHeartbeatRequestSent(); Review Comment: Would it not be more consistent to run this one a leave heartbeat as well, just handle the state correctly internally? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -744,16 +767,34 @@ public boolean isLeavingGroup() { return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING; } +@Override +public void maybeRejoinStaleMember() { +if (state == MemberState.STALE) { +staleMemberAssignmentRelease.whenComplete((__, error) -> transitionToJoining()); Review Comment: Can we avoid enqueuing many of these completionstages while we haven't finished the `staleMemberAssignmentRelease`? although it looks like the operation is idempotent, it's not good to create an arbitrary number of Futures here (with every `PollApplicationEvent`, a new one is created! ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -132,11 +132,20 @@ public enum MemberState { PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING, UNSUBSCRIBED, FENCED); -LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING); +// Transition from prepare leaving to leaving is the expected one in all close operations +// except for when the poll timer expires (ex. leave group due to unsubscribe or consumer +// close, where member triggers callbacks first while it continues sending heartbeat +// (PREPARE_LEAVE state) and then sends the heartbeat to leave (LEAVING state). +// All other transitions directly to LEAVING are expected when the member leaves due to +// expired poll timer. In that case, the member sends the heartbeat to leave first, and +// then invokes callbacks to release assignment while STALE, not sending any more +// heartbeats while STALE because it has been already removed from the group on the broker. +LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING, RECONCILING, Review Comment: Couldn't we avoid this long comment and complexity by transitioning through `PREPARE_LEAVING` either way - even if we don't do much? I don't like how a corner case (poll timer expiration) complicates the state machine so much ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -564,7 +587,7 @@ public void transitionToJoining() { */ @Override public CompletableFuture leaveGroup() { -if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { +if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || state == MemberState.STALE) { Review Comment: Do we really want to allow leave group when we are fenced already? As far as I can see we'd invoke `onPartitionsLost` when being fenced and then again `onPartitionsRevoked` when leave group. Can we just use a `nonInGroup` check here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on code in PR #15415: URL: https://github.com/apache/kafka/pull/15415#discussion_r1498553247 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -169,10 +169,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { startingTimestamp = startingTimestamp) } - // TODO: Enable this test for both protocols when the Jira tracking its failure (KAFKA-16008) is fixed. This - // is done by setting the @MethodSource value to "getTestQuorumAndGroupProtocolParametersAll" @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) Review Comment: This consistently passes locally with this PR, ran it repeatedly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]
lianetm commented on PR #15415: URL: https://github.com/apache/kafka/pull/15415#issuecomment-1958564382 Hey @lucasbru , could you take a look at this one? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org