AndrewJSchofield commented on code in PR #15375: URL: https://github.com/apache/kafka/pull/15375#discussion_r1490086839
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -731,10 +729,29 @@ private boolean targetAssignmentReconciled() { return currentAssignment.equals(currentTargetAssignment); } + /** + * @return True if the member should not send heartbeats, which would be one of the following + * cases: + * <ul> + * <li>Member is not subscribed to any topics</li> + * <li>Member has received a fatal error in a previous heartbeat response</li> + * <li>Member is stale, meaning that it has left the group due to expired poll timer</li> + * </ul> + */ @Override public boolean shouldSkipHeartbeat() { MemberState state = state(); - return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; + return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || state == MemberState.STALE; + } + + /** + * @return True if the member is preparing to leave the group (waiting for callbacks), or + * leaving (sending last heartbeat). This is used to skip proactively leaving the group when + * the consumer poll timer expires. + */ + public boolean isLeavingGroup() { + MemberState state = state(); + return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING; } /** Review Comment: I think that technically, it doesn't take the *user* to poll in order to rejoin. The code polls more frequently is the user's poll timeout is long enough. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -188,18 +188,18 @@ public HeartbeatRequestManager( @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent() || - membershipManager.shouldSkipHeartbeat() || - pollTimer.isExpired()) { + membershipManager.shouldSkipHeartbeat()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); - if (pollTimer.isExpired()) { - logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + - "was longer than the configured max.poll.interval.ms, which typically implies that " + - "the poll loop is spending too much time processing messages. You can address this " + - "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + - "returned in poll() with max.poll.records."); + if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { + logger.warn("Consumer poll timeout has expired. This means the time between " + + "subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + + "which typically implies that the poll loop is spending too much time processing " + + "messages. You can address this either by increasing max.poll.interval.ms or by " + + "reducing the maximum size of batches returned in poll() with max.poll.records."); + Review Comment: I think the logic as written is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org