Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
lucasbru merged PR #15375: URL: https://github.com/apache/kafka/pull/15375 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
lianetm commented on code in PR #15375: URL: https://github.com/apache/kafka/pull/15375#discussion_r1491231865 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -731,10 +729,29 @@ private boolean targetAssignmentReconciled() { return currentAssignment.equals(currentTargetAssignment); } +/** + * @return True if the member should not send heartbeats, which would be one of the following + * cases: + * + * Member is not subscribed to any topics + * Member has received a fatal error in a previous heartbeat response + * Member is stale, meaning that it has left the group due to expired poll timer + * + */ @Override public boolean shouldSkipHeartbeat() { MemberState state = state(); -return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; +return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || state == MemberState.STALE; +} + +/** + * @return True if the member is preparing to leave the group (waiting for callbacks), or + * leaving (sending last heartbeat). This is used to skip proactively leaving the group when + * the consumer poll timer expires. + */ +public boolean isLeavingGroup() { +MemberState state = state(); +return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING; } /** Review Comment: You're right, fixed it here and on the state description, mentioning the application poll event, which is really what it takes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
lianetm commented on code in PR #15375: URL: https://github.com/apache/kafka/pull/15375#discussion_r1491222723 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -685,13 +690,6 @@ public boolean shouldHeartbeatNow() { @Override public void onHeartbeatRequestSent() { MemberState state = state(); -if (isStaled()) { -log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -// TODO: Integrate partition revocation/loss callback -transitionToJoining(); -return; -} - Review Comment: Do you mean comment here? The handling is all over the manager really, but the main logic is in the transition functions I would say: `transitionToJoining`, `transitionToFenced`, `transitionToFatal`, `transitionToSendingLeaveGroup`, `transitionToUnsubscribed`, `transitionToStale`, and the `reconcile` of course. Each state handling func usually needs to take specific actions for the transitions, but also considering how things might have changed. This is a state machine that changes with inputs from 2 sides: API calls, and broker HB responses, so lots of things can change, anytime, while we transition from A to B. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
lianetm commented on code in PR #15375: URL: https://github.com/apache/kafka/pull/15375#discussion_r1491204862 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -335,8 +335,13 @@ public int memberEpoch() { return memberEpoch; } +/** + * @return True if there hasn't been a call to consumer.poll() withing the max.poll.interval. + * In that case, it is expected that the member will leave the group and rejoin on the next + * call to consumer.poll(). + */ @Override -public boolean isStaled() { +public boolean isStale() { Review Comment: I would guess that @philipnee preferred the convenience method when he added it, to use it from the HBManager. That being said, your comment made me notice that the isStale is not used anymore there, so removing it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
lianetm commented on code in PR #15375: URL: https://github.com/apache/kafka/pull/15375#discussion_r1491192487 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -188,18 +188,18 @@ public HeartbeatRequestManager( @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent() || -membershipManager.shouldSkipHeartbeat() || -pollTimer.isExpired()) { +membershipManager.shouldSkipHeartbeat()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); -if (pollTimer.isExpired()) { -logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + -"was longer than the configured max.poll.interval.ms, which typically implies that " + -"the poll loop is spending too much time processing messages. You can address this " + -"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + -"returned in poll() with max.poll.records."); +if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { +logger.warn("Consumer poll timeout has expired. This means the time between " + +"subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + +"which typically implies that the poll loop is spending too much time processing " + +"messages. You can address this either by increasing max.poll.interval.ms or by " + +"reducing the maximum size of batches returned in poll() with max.poll.records."); + Review Comment: We want both conditions together, because we do want to continue down to ln 212 if the timer expired and the member is leaving, basically letting the ongoing leaving operation win over the expired poll. That would be one of these cases: - timer expired and member is in PREPARE_LEAVING: we do want to continue sending HB while preparing to leave, until the callback completes and then transition to LEAVING. - timer expired and member is in LEAVING : we do want to send that last HB that the LEAVING sends, and then transition out of the LEAVING to UNSUBSCRIBED -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
lianetm commented on PR #15375: URL: https://github.com/apache/kafka/pull/15375#issuecomment-1946314919 Hey @lucasbru, agree that the callback invocation is missing, I had filed https://issues.apache.org/jira/browse/KAFKA-16258 to tackle that in a following PR if that's ok (mostly because I expect it should reuse some existing logic for callbacks, so I will refactor that code a bit to reuse it, and it will need specific testing for the poll timer callback, so thought we could solve the invalid transition here, and add the callbacks right after). Regarding your concern with the state, the key is that STALE state, even though it sends a leave group request, is more like the FENCED or FATAL regarding callbacks and heartbeat: - When member gets fenced/fatal/stale, it triggers the `onPartitionsLost` (not the revoked), and it also stops sending HB. This means that it's much simpler, we should not need any extra state like prepare_stale. The PREPARE_LEAVING is needed because while executing callbacks to leave, the member remains active, sending HB, so lots of things could happen from the broker side in the meantime, like fenced, fatal error, that we need to handle differently because we know the member is already preparing to leave. Also note that the member could remain STALE for a while after sending the HB to leave the group and executing onPartitionsLost, so we don't want to "rejoinAferLeave". We want to remain in that STALE state (meaning that it is not in the group, just like UNSUBSCRIBED, but with the difference that they require different actions to transition out). The member should transition out of STALE and rejoin only when the poll timer is reset with the app level poll event. Makes sense? I would expect that with KAFKA-16258 we end up integrating a call to onPartitionsLost after sending the HB to leave, to keep the same sequence as the legacy coordinator. I added the pointers to the legacy logic in the task description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
AndrewJSchofield commented on code in PR #15375: URL: https://github.com/apache/kafka/pull/15375#discussion_r1490086839 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -731,10 +729,29 @@ private boolean targetAssignmentReconciled() { return currentAssignment.equals(currentTargetAssignment); } +/** + * @return True if the member should not send heartbeats, which would be one of the following + * cases: + * + * Member is not subscribed to any topics + * Member has received a fatal error in a previous heartbeat response + * Member is stale, meaning that it has left the group due to expired poll timer + * + */ @Override public boolean shouldSkipHeartbeat() { MemberState state = state(); -return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; +return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || state == MemberState.STALE; +} + +/** + * @return True if the member is preparing to leave the group (waiting for callbacks), or + * leaving (sending last heartbeat). This is used to skip proactively leaving the group when + * the consumer poll timer expires. + */ +public boolean isLeavingGroup() { +MemberState state = state(); +return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING; } /** Review Comment: I think that technically, it doesn't take the *user* to poll in order to rejoin. The code polls more frequently is the user's poll timeout is long enough. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -188,18 +188,18 @@ public HeartbeatRequestManager( @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent() || -membershipManager.shouldSkipHeartbeat() || -pollTimer.isExpired()) { +membershipManager.shouldSkipHeartbeat()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); -if (pollTimer.isExpired()) { -logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + -"was longer than the configured max.poll.interval.ms, which typically implies that " + -"the poll loop is spending too much time processing messages. You can address this " + -"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + -"returned in poll() with max.poll.records."); +if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { +logger.warn("Consumer poll timeout has expired. This means the time between " + +"subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + +"which typically implies that the poll loop is spending too much time processing " + +"messages. You can address this either by increasing max.poll.interval.ms or by " + +"reducing the maximum size of batches returned in poll() with max.poll.records."); + Review Comment: I think the logic as written is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
kirktrue commented on code in PR #15375: URL: https://github.com/apache/kafka/pull/15375#discussion_r1490029659 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -188,18 +188,18 @@ public HeartbeatRequestManager( @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent() || -membershipManager.shouldSkipHeartbeat() || -pollTimer.isExpired()) { +membershipManager.shouldSkipHeartbeat()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); -if (pollTimer.isExpired()) { -logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + -"was longer than the configured max.poll.interval.ms, which typically implies that " + -"the poll loop is spending too much time processing messages. You can address this " + -"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + -"returned in poll() with max.poll.records."); +if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { +logger.warn("Consumer poll timeout has expired. This means the time between " + +"subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + +"which typically implies that the poll loop is spending too much time processing " + +"messages. You can address this either by increasing max.poll.interval.ms or by " + +"reducing the maximum size of batches returned in poll() with max.poll.records."); + Review Comment: Should the `!membershipManager.isLeavingGroup()` if clause be a nested `if` _inside_ the `if (pollTimer.isExpired()`? That is, do we really want to continue down to line 212 if the timer is expired but it's not already leaving the group? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -335,8 +335,13 @@ public int memberEpoch() { return memberEpoch; } +/** + * @return True if there hasn't been a call to consumer.poll() withing the max.poll.interval. + * In that case, it is expected that the member will leave the group and rejoin on the next + * call to consumer.poll(). + */ @Override -public boolean isStaled() { +public boolean isStale() { Review Comment: Thank you! ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -335,8 +335,13 @@ public int memberEpoch() { return memberEpoch; } +/** + * @return True if there hasn't been a call to consumer.poll() withing the max.poll.interval. + * In that case, it is expected that the member will leave the group and rejoin on the next + * call to consumer.poll(). + */ @Override -public boolean isStaled() { +public boolean isStale() { Review Comment: Any reason we don't just expose the inner state via a `state()` method so that we don't have to write `isStateA`, `isStateB`, `isStateC`, etc.? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -685,13 +690,6 @@ public boolean shouldHeartbeatNow() { @Override public void onHeartbeatRequestSent() { MemberState state = state(); -if (isStaled()) { -log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -// TODO: Integrate partition revocation/loss callback -transitionToJoining(); -return; -} - Review Comment: For the uninitiated (like myself), would you consider adding a really brief comment that provides pointers to where some of the other states (e.g. `STALE`) are handled? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -64,9 +64,11 @@ public interface MembershipManager extends RequestManager { MemberState state(); /** - * @return True if the member is staled due to expired poll timer. + * @return True if the poll timer expired, indicating that there hasn't been a call to + * consumer poll within the max poll interval. In this case, the member will proactively + * leave the group, and rejoin on the next call to poll. */ -boolean isStaled(); +boolean isStale(); Review Comment: Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
[PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
lianetm opened a new pull request, #15375: URL: https://github.com/apache/kafka/pull/15375 This fixes an invalid transition (leaving->stale) that was discovered in the system tests. The underlying issue was that the poll timer expiration logic was blindly forcing a transition to stale and sending a leave group, without considering that the member could be already leaving. The fix included in this PR ensures that the poll timer expiration logic, whose purpose is to leave the group, is only applied if the member is not already leaving. Note that it also fixes the transition out of the STALE state, that should only happen when the poll timer is reset. As a result of this changes: - If the poll timer expires while the member is not leaving, the poll timer expiration logic is applied: it will transition to stale, send a leave group, and remain in STALE state until the timer is reset. At that point the member will transition to JOINING to rejoin the group. - If the poll timer expires while the member is already leaving, the poll timer expiration logic does not apply, and just lets the HB continue. Not that this would be the case of member in PREPARE_LEAVING waiting for callbacks to complete (needs to continue sending HB), or LEAVING (needs to send the last HB to leave). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org