Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-26 Thread via GitHub


lucasbru merged PR #15415:
URL: https://github.com/apache/kafka/pull/15415


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-23 Thread via GitHub


lianetm commented on PR #15415:
URL: https://github.com/apache/kafka/pull/15415#issuecomment-1961554165

   Closing and re-opening PR to trigger build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-23 Thread via GitHub


lianetm closed pull request #15415: KAFKA-16258: callback to release assignment 
when stale member leaves group
URL: https://github.com/apache/kafka/pull/15415


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-23 Thread via GitHub


lianetm commented on PR #15415:
URL: https://github.com/apache/kafka/pull/15415#issuecomment-1961481081

   Changes done @lucasbru, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-23 Thread via GitHub


lianetm commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1500782762


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -132,11 +132,20 @@ public enum MemberState {
 PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, 
RECONCILING,
 ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
 
-LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
+// Transition from prepare leaving to leaving is the expected one in 
all close operations
+// except for when the poll timer expires (ex. leave group due to 
unsubscribe or consumer
+// close, where member triggers callbacks first while it continues 
sending heartbeat
+// (PREPARE_LEAVE state) and  then sends the heartbeat to leave 
(LEAVING state).
+// All other transitions directly to LEAVING are expected when the 
member leaves due to
+// expired poll timer. In that case, the member sends the heartbeat to 
leave first, and
+// then invokes callbacks to release assignment while STALE, not 
sending any more
+// heartbeats while STALE because it has been already removed from the 
group on the broker.
+LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING, 
RECONCILING,

Review Comment:
   I see, I was hesitating about forcing that transition with a 
`transitionToPrepareLeaving` and also changing the prepare-leaving concept, but 
totally get the gain of reusing the state, and thinking of it slightly 
different convinced me: I reused the state, but not exposed at the HBMgr level, 
but internally in the membershipMgr. So we do simplify all those transitions, 
we still keep a single 
`membershipManager.transitionToSendingLeaveGroup(dueToPollTimerExpired)` in the 
HBMgr, and internally in its implementation we do the transient transition 
through prepare_leaving if poll timer expired. What do you think? Nice result I 
believe. Thanks for the very helpful feedback!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-23 Thread via GitHub


lucasbru commented on PR #15415:
URL: https://github.com/apache/kafka/pull/15415#issuecomment-1961282993

   Let me know if you are planning to change something. Otherwise, I think we 
can merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-23 Thread via GitHub


lucasbru commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1500621907


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -132,11 +132,20 @@ public enum MemberState {
 PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, 
RECONCILING,
 ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
 
-LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
+// Transition from prepare leaving to leaving is the expected one in 
all close operations
+// except for when the poll timer expires (ex. leave group due to 
unsubscribe or consumer
+// close, where member triggers callbacks first while it continues 
sending heartbeat
+// (PREPARE_LEAVE state) and  then sends the heartbeat to leave 
(LEAVING state).
+// All other transitions directly to LEAVING are expected when the 
member leaves due to
+// expired poll timer. In that case, the member sends the heartbeat to 
leave first, and
+// then invokes callbacks to release assignment while STALE, not 
sending any more
+// heartbeats while STALE because it has been already removed from the 
group on the broker.
+LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING, 
RECONCILING,

Review Comment:
   What I meant was, just do `transitionToPrepareLeaving(); 
transitionToLeaving()`, to simplify the state machine. Then, whether we 
heartbeat or not would not matter. Just use an empty preparation phase, instead 
of skipping it entirely. But all of these are just ideas, I'll leave it to you 
to decide whether it's worth it or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-22 Thread via GitHub


lianetm commented on PR #15415:
URL: https://github.com/apache/kafka/pull/15415#issuecomment-1960700348

   Merged trunked latest changes fixing build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-22 Thread via GitHub


lianetm commented on PR #15415:
URL: https://github.com/apache/kafka/pull/15415#issuecomment-1960329387

   Hey @lucasbru. Great push for simplification, went as far as I could see 
unifying the leave operations, but still living with the fact that the expired 
timer brings up a leave path that's quite different from the existing one.  
   Thanks for the comments, all addressed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-22 Thread via GitHub


lianetm commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1499941957


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -564,7 +587,7 @@ public void transitionToJoining() {
  */
 @Override
 public CompletableFuture leaveGroup() {
-if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || 
state == MemberState.STALE) {

Review Comment:
   Filed https://issues.apache.org/jira/browse/KAFKA-16301 for reviewing the 
callback interactions in edge cases of fence+unsubscribe+subscribe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-22 Thread via GitHub


lianetm commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1499920005


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -132,11 +132,20 @@ public enum MemberState {
 PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, 
RECONCILING,
 ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
 
-LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
+// Transition from prepare leaving to leaving is the expected one in 
all close operations
+// except for when the poll timer expires (ex. leave group due to 
unsubscribe or consumer
+// close, where member triggers callbacks first while it continues 
sending heartbeat
+// (PREPARE_LEAVE state) and  then sends the heartbeat to leave 
(LEAVING state).
+// All other transitions directly to LEAVING are expected when the 
member leaves due to
+// expired poll timer. In that case, the member sends the heartbeat to 
leave first, and
+// then invokes callbacks to release assignment while STALE, not 
sending any more
+// heartbeats while STALE because it has been already removed from the 
group on the broker.
+LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING, 
RECONCILING,

Review Comment:
   I would say we shouldn't transition through prepare_leaving because it 
represents a member that is still part of the group, which is not the case in 
this situation where the first thing is to send the leave group and then 
everything else. This means that the prepare-leaving continues to send HB for 
instance, and needs to be considered in all the interactions that could happen 
while HBeating. Of course we could attempt to reuse the state, but would 
probably be confusing being preparing to leave when we already sent the leave 
group, and would complicate the now clear decision to HB while prepare leaving 
and not HB while stale. The fundamental difference in sequence of the 2 kind of 
leave makes it tricky to reuse the whole path: regular leave need to prepare to 
leave and then leave. The expiration leave needs to leave first, and then go to 
a state where it needs to do different things than the prepare leave: it needs 
to stop HB, release assignment (this is the only similar to prepare
  leaving), wait for timer reset, and then rejoin. All different needs than the 
prepare, except for the release assignment. 
   
   Most of these transitions existed before btw (as previous to STALE), they 
just moved places because we're reusing the LEAVING now. So in that sense the 
change is not introducing lots to the state machine,  just moving transitions 
to reuse the existing LEAVING. With the change  
[8440b3a](https://github.com/apache/kafka/pull/15415/commits/8440b3a405522037f1d7387289c0f2d9271c76cf)
 for a unified leave I think it's better, and I did remove the comment since 
the intention is clearer with the leave group change now I believe. Please take 
a look and let me know if it aligns better 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-22 Thread via GitHub


lianetm commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1499894219


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -215,6 +216,7 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 }
 
 NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs, false);
+membershipManager.onHeartbeatRequestSent();

Review Comment:
   yes, makes sense, done. I had considered it but wasn't convinced thinking 
that it would complicate the leaving on the membershipMgr, but with your 
comment I took another round and it does seem better actually, just a single 
leaving, that in some cases is due to poll timer expired (and that allows the 
manager to make the right transitions after leaving).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-22 Thread via GitHub


lianetm commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1499559758


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -744,16 +767,34 @@ public boolean isLeavingGroup() {
 return state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING;
 }
 
+@Override
+public void maybeRejoinStaleMember() {
+if (state == MemberState.STALE) {
+staleMemberAssignmentRelease.whenComplete((__, error) -> 
transitionToJoining());

Review Comment:
   Sure, good catch, done. We truly only need it when an expired poll timer is 
reset. Note that I added the check in the HB (where I'm trying to keep the poll 
timer logic), and the membershipMgr is only concern about the stale situation 
(which is more than just the timer concept, because it includes the callbacks 
that may take a while)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-22 Thread via GitHub


lianetm commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1499490700


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -564,7 +587,7 @@ public void transitionToJoining() {
  */
 @Override
 public CompletableFuture leaveGroup() {
-if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || 
state == MemberState.STALE) {

Review Comment:
   Good point, I think we could reuse it and I think it's a good direction. It 
does require taking care of some other details: we need to make sure that when 
the fencing callbacks complete, we don't blindly transition to joining (which 
is the current shape). And also, when using `notInGroup` here for a no-op 
leave, the fencing case is a bit different that then others, because we do need 
to transition to UNSUBSCRIBED (other states remain unchanged if a leave group 
call happens). All done. 
   
   This suggestion also makes me think about another tricky situation, that I 
will leave to review separately if that's ok. I guess we could get a leave 
group while fenced, then the user subscribes again, and the member will rejoin, 
while the onPartitionsLost may have not completed yet. Only concern is that we 
could get an assignment and end up running onPartitionsAssigned while the 
onPartitionsLost is still running. That's not the behaviour in the old consumer 
(callbacks execution blocks in the coordinator). I will file a jira to review 
it better, as it would definitely complicate things more (as callbacks always 
do).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-22 Thread via GitHub


lucasbru commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1499180116


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -215,6 +216,7 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 }
 
 NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs, false);
+membershipManager.onHeartbeatRequestSent();

Review Comment:
   Would it not be more consistent to run this one a leave heartbeat as well, 
just handle the state correctly internally?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -744,16 +767,34 @@ public boolean isLeavingGroup() {
 return state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING;
 }
 
+@Override
+public void maybeRejoinStaleMember() {
+if (state == MemberState.STALE) {
+staleMemberAssignmentRelease.whenComplete((__, error) -> 
transitionToJoining());

Review Comment:
   Can we avoid enqueuing many of these completionstages while we haven't 
finished the `staleMemberAssignmentRelease`? although it looks like the 
operation is idempotent, it's not good to create an arbitrary number of Futures 
here (with every `PollApplicationEvent`, a new one is created!



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -132,11 +132,20 @@ public enum MemberState {
 PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, 
RECONCILING,
 ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
 
-LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
+// Transition from prepare leaving to leaving is the expected one in 
all close operations
+// except for when the poll timer expires (ex. leave group due to 
unsubscribe or consumer
+// close, where member triggers callbacks first while it continues 
sending heartbeat
+// (PREPARE_LEAVE state) and  then sends the heartbeat to leave 
(LEAVING state).
+// All other transitions directly to LEAVING are expected when the 
member leaves due to
+// expired poll timer. In that case, the member sends the heartbeat to 
leave first, and
+// then invokes callbacks to release assignment while STALE, not 
sending any more
+// heartbeats while STALE because it has been already removed from the 
group on the broker.
+LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING, 
RECONCILING,

Review Comment:
   Couldn't we avoid this long comment and complexity by transitioning through 
`PREPARE_LEAVING` either way - even if we don't do much? I don't like how a 
corner case (poll timer expiration) complicates the state machine so much



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -564,7 +587,7 @@ public void transitionToJoining() {
  */
 @Override
 public CompletableFuture leaveGroup() {
-if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || 
state == MemberState.STALE) {

Review Comment:
   Do we really want to allow leave group when we are fenced already? As far as 
I can see we'd invoke `onPartitionsLost` when being fenced and then again 
`onPartitionsRevoked` when leave group. 
   
   Can we just use a `nonInGroup` check here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-21 Thread via GitHub


lianetm commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1498553247


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -169,10 +169,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   startingTimestamp = startingTimestamp)
   }
 
-  // TODO: Enable this test for both protocols when the Jira tracking its 
failure (KAFKA-16008) is fixed. This
-  //   is done by setting the @MethodSource value to 
"getTestQuorumAndGroupProtocolParametersAll"
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))

Review Comment:
   This consistently passes locally with this PR, ran it repeatedly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-21 Thread via GitHub


lianetm commented on PR #15415:
URL: https://github.com/apache/kafka/pull/15415#issuecomment-1958564382

   Hey @lucasbru , could you take a look at this one? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16258: callback to release assignment when stale member leaves group [kafka]

2024-02-21 Thread via GitHub


lianetm opened a new pull request, #15415:
URL: https://github.com/apache/kafka/pull/15415

   Introduce call to `onPartitionsLost` callback to release assignment when a 
consumer pro-actively leaves the group due to poll timer expired. 
   
   When the poll timer expires, the member sends a leave group request (reusing 
same existing LEAVING state and logic), and then transitions to STALE to 
release it assignment and wait for the poll timer reset. Once both conditions 
are met, the consumer transitions out of the STALE state to rejoin the group. 
Note that while on this STALE state, the member is not part of the group so it 
does not send heartbeats.
   
   This PR also includes the fix to ensure that while STALE or in any other 
state where the member is not in the group, heartbeat responses that may be 
received are ignored. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org