Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-16 Thread via GitHub


lucasbru merged PR #15375:
URL: https://github.com/apache/kafka/pull/15375


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-15 Thread via GitHub


lianetm commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1491231865


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -731,10 +729,29 @@ private boolean targetAssignmentReconciled() {
 return currentAssignment.equals(currentTargetAssignment);
 }
 
+/**
+ * @return True if the member should not send heartbeats, which would be 
one of the following
+ * cases:
+ * 
+ * Member is not subscribed to any topics
+ * Member has received a fatal error in a previous heartbeat 
response
+ * Member is stale, meaning that it has left the group due to expired 
poll timer
+ * 
+ */
 @Override
 public boolean shouldSkipHeartbeat() {
 MemberState state = state();
-return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL 
|| state == MemberState.STALE;
+}
+
+/**
+ * @return True if the member is preparing to leave the group (waiting for 
callbacks), or
+ * leaving (sending last heartbeat). This is used to skip proactively 
leaving the group when
+ * the consumer poll timer expires.
+ */
+public boolean isLeavingGroup() {
+MemberState state = state();
+return state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING;
 }
 
 /**

Review Comment:
   You're right, fixed it here and on the state description, mentioning the 
application poll event, which is really what it takes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-15 Thread via GitHub


lianetm commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1491222723


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -685,13 +690,6 @@ public boolean shouldHeartbeatNow() {
 @Override
 public void onHeartbeatRequestSent() {
 MemberState state = state();
-if (isStaled()) {
-log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-// TODO: Integrate partition revocation/loss callback
-transitionToJoining();
-return;
-}
-

Review Comment:
   Do you mean comment here? The handling is all over the manager really, but 
the main logic is in the transition functions I would say: 
`transitionToJoining`, `transitionToFenced`, `transitionToFatal`, 
`transitionToSendingLeaveGroup`, `transitionToUnsubscribed`, 
`transitionToStale`, and the `reconcile` of course. Each state handling func 
usually needs to take specific actions for the transitions, but also 
considering how things might have changed. This is a state machine that changes 
with inputs from 2 sides: API calls, and broker HB responses, so lots of things 
can change, anytime, while we transition from A to B.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-15 Thread via GitHub


lianetm commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1491204862


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -335,8 +335,13 @@ public int memberEpoch() {
 return memberEpoch;
 }
 
+/**
+ * @return True if there hasn't been a call to consumer.poll() withing the 
max.poll.interval.
+ * In that case, it is expected that the member will leave the group and 
rejoin on the next
+ * call to consumer.poll().
+ */
 @Override
-public boolean isStaled() {
+public boolean isStale() {

Review Comment:
   I would guess that @philipnee preferred the convenience method when he added 
it, to use it from the HBManager. That being said, your comment made me notice 
that the isStale is not used anymore there, so removing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-15 Thread via GitHub


lianetm commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1491192487


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -188,18 +188,18 @@ public HeartbeatRequestManager(
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
 if (!coordinatorRequestManager.coordinator().isPresent() ||
-membershipManager.shouldSkipHeartbeat() ||
-pollTimer.isExpired()) {
+membershipManager.shouldSkipHeartbeat()) {
 membershipManager.onHeartbeatRequestSkipped();
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
-logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
-"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
-"the poll loop is spending too much time processing messages. 
You can address this " +
-"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
-"returned in poll() with max.poll.records.");
+if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+"subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+"which typically implies that the poll loop is spending too 
much time processing " +
+"messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+"reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+

Review Comment:
   We want both conditions together, because we do want to continue down to ln 
212 if the timer expired and the member is leaving, basically letting the 
ongoing leaving operation win over the expired poll. That would be one of these 
cases:
   - timer expired and member is in PREPARE_LEAVING: we do want to continue 
sending HB while preparing to leave, until the callback completes and then 
transition to LEAVING. 
   - timer expired and member is in LEAVING : we do want to send that last HB 
that the LEAVING sends, and then transition out of the LEAVING to UNSUBSCRIBED



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-15 Thread via GitHub


lianetm commented on PR #15375:
URL: https://github.com/apache/kafka/pull/15375#issuecomment-1946314919

   Hey @lucasbru, agree that the callback invocation is missing, I had filed 
https://issues.apache.org/jira/browse/KAFKA-16258 to tackle that in a following 
PR if that's ok (mostly because I expect it should reuse some existing logic 
for callbacks, so I will refactor that code a bit to reuse it, and it will need 
specific testing for the poll timer callback, so thought we could solve the 
invalid transition here, and add the callbacks right after).
   
   Regarding your concern with the state, the key is that STALE state, even 
though it sends a leave group request, is more like the FENCED or FATAL 
regarding callbacks and heartbeat:
   - When member gets fenced/fatal/stale, it triggers the `onPartitionsLost` 
(not the revoked), and it also stops sending HB. This means that it's much 
simpler, we should not need any extra state like prepare_stale. The 
PREPARE_LEAVING is needed because while executing callbacks to leave, the 
member remains active, sending HB, so lots of things could happen from the 
broker side in the meantime, like fenced, fatal error, that we need to handle 
differently because we know the member is already preparing to leave.
   
   Also note that the member could remain STALE for a while after sending the 
HB to leave the group and executing onPartitionsLost, so we don't want to 
"rejoinAferLeave". We want to remain in that STALE state (meaning that it is 
not in the group, just like UNSUBSCRIBED, but with the difference that they 
require different actions to transition out). The member should transition out 
of STALE and rejoin only when the poll timer is reset with the app level poll 
event.
   
   Makes sense? I would expect that with KAFKA-16258 we end up integrating a 
call to onPartitionsLost after sending the HB to leave, to keep the same 
sequence as the legacy coordinator. I added the pointers to the legacy logic in 
the task description. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-14 Thread via GitHub


AndrewJSchofield commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1490086839


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -731,10 +729,29 @@ private boolean targetAssignmentReconciled() {
 return currentAssignment.equals(currentTargetAssignment);
 }
 
+/**
+ * @return True if the member should not send heartbeats, which would be 
one of the following
+ * cases:
+ * 
+ * Member is not subscribed to any topics
+ * Member has received a fatal error in a previous heartbeat 
response
+ * Member is stale, meaning that it has left the group due to expired 
poll timer
+ * 
+ */
 @Override
 public boolean shouldSkipHeartbeat() {
 MemberState state = state();
-return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL 
|| state == MemberState.STALE;
+}
+
+/**
+ * @return True if the member is preparing to leave the group (waiting for 
callbacks), or
+ * leaving (sending last heartbeat). This is used to skip proactively 
leaving the group when
+ * the consumer poll timer expires.
+ */
+public boolean isLeavingGroup() {
+MemberState state = state();
+return state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING;
 }
 
 /**

Review Comment:
   I think that technically, it doesn't take the *user* to poll in order to 
rejoin. The code polls more frequently is the user's poll timeout is long 
enough.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -188,18 +188,18 @@ public HeartbeatRequestManager(
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
 if (!coordinatorRequestManager.coordinator().isPresent() ||
-membershipManager.shouldSkipHeartbeat() ||
-pollTimer.isExpired()) {
+membershipManager.shouldSkipHeartbeat()) {
 membershipManager.onHeartbeatRequestSkipped();
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
-logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
-"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
-"the poll loop is spending too much time processing messages. 
You can address this " +
-"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
-"returned in poll() with max.poll.records.");
+if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+"subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+"which typically implies that the poll loop is spending too 
much time processing " +
+"messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+"reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+

Review Comment:
   I think the logic as written is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-14 Thread via GitHub


kirktrue commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1490029659


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -188,18 +188,18 @@ public HeartbeatRequestManager(
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
 if (!coordinatorRequestManager.coordinator().isPresent() ||
-membershipManager.shouldSkipHeartbeat() ||
-pollTimer.isExpired()) {
+membershipManager.shouldSkipHeartbeat()) {
 membershipManager.onHeartbeatRequestSkipped();
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
-logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
-"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
-"the poll loop is spending too much time processing messages. 
You can address this " +
-"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
-"returned in poll() with max.poll.records.");
+if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+"subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+"which typically implies that the poll loop is spending too 
much time processing " +
+"messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+"reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+

Review Comment:
   Should the `!membershipManager.isLeavingGroup()` if clause be a nested `if` 
_inside_ the `if (pollTimer.isExpired()`? That is, do we really want to 
continue down to line 212 if the timer is expired but it's not already leaving 
the group?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -335,8 +335,13 @@ public int memberEpoch() {
 return memberEpoch;
 }
 
+/**
+ * @return True if there hasn't been a call to consumer.poll() withing the 
max.poll.interval.
+ * In that case, it is expected that the member will leave the group and 
rejoin on the next
+ * call to consumer.poll().
+ */
 @Override
-public boolean isStaled() {
+public boolean isStale() {

Review Comment:
   Thank you!  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -335,8 +335,13 @@ public int memberEpoch() {
 return memberEpoch;
 }
 
+/**
+ * @return True if there hasn't been a call to consumer.poll() withing the 
max.poll.interval.
+ * In that case, it is expected that the member will leave the group and 
rejoin on the next
+ * call to consumer.poll().
+ */
 @Override
-public boolean isStaled() {
+public boolean isStale() {

Review Comment:
   Any reason we don't just expose the inner state via a `state()` method so 
that we don't have to write `isStateA`, `isStateB`, `isStateC`,  etc.?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -685,13 +690,6 @@ public boolean shouldHeartbeatNow() {
 @Override
 public void onHeartbeatRequestSent() {
 MemberState state = state();
-if (isStaled()) {
-log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-// TODO: Integrate partition revocation/loss callback
-transitionToJoining();
-return;
-}
-

Review Comment:
   For the uninitiated (like myself), would you consider adding a really brief 
comment that provides pointers to where some of the other states (e.g. `STALE`) 
are handled?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -64,9 +64,11 @@ public interface MembershipManager extends RequestManager {
 MemberState state();
 
 /**
- * @return True if the member is staled due to expired poll timer.
+ * @return True if the poll timer expired, indicating that there hasn't 
been a call to
+ * consumer poll within the max poll interval. In this case, the member 
will proactively
+ * leave the group, and rejoin on the next call to poll.
  */
-boolean isStaled();
+boolean isStale();

Review Comment:
   Thank you! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-14 Thread via GitHub


lianetm opened a new pull request, #15375:
URL: https://github.com/apache/kafka/pull/15375

   This fixes an invalid transition (leaving->stale) that was discovered in the 
system tests. The underlying issue was that the poll timer expiration logic was 
blindly forcing a transition to stale and sending a leave group, without 
considering that the member could be already leaving. 
   The fix included in this PR ensures that the poll timer expiration logic, 
whose purpose is to leave the group, is only applied if the member is not 
already leaving. Note that it also fixes the transition out of the STALE state, 
that should only happen when the poll timer is reset. 
   
   As a result of this changes:
   - If the poll timer expires while the member is not leaving, the poll timer 
expiration logic is applied: it will transition to stale, send a leave group, 
and remain in STALE state until the timer is reset. At that point the member 
will transition to JOINING to rejoin the group. 
   - If the poll timer expires while the member is already leaving, the poll 
timer expiration logic does not apply, and just lets the HB continue. Not that 
this would be the case of member in PREPARE_LEAVING waiting for callbacks to 
complete (needs to continue sending HB), or LEAVING (needs to send the last HB 
to leave).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org