cadonna commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1415377837
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -121,11 +127,13 @@ public HeartbeatRequestManager( this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, rebalanceTimeoutMs); this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, retryBackoffMaxMs, rebalanceTimeoutMs); + this.pollTimer = time.timer(rebalanceTimeoutMs); } // Visible for testing HeartbeatRequestManager( final LogContext logContext, + final Time time, Review Comment: Why not passing in directly the poll timer instead of the mock time that is only used to create the poll timer. By passing in the poll timer, you can also remove method `pollTimer()` since that is only needed for testing. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -121,11 +127,13 @@ public HeartbeatRequestManager( this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, rebalanceTimeoutMs); this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, retryBackoffMaxMs, rebalanceTimeoutMs); + this.pollTimer = time.timer(rebalanceTimeoutMs); Review Comment: Yes, please, rename. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -399,14 +400,45 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() - .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) - .setMemberId(memberId) - .setMemberEpoch(1) - .setAssignment(assignmentTopic1)); + .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) + .setMemberId(memberId) + .setMemberEpoch(1) + .setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } + @Test + public void testEnsureLeaveGroupWhenPollTimerExpires() { + membershipManager.transitionToJoining(); + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + // Sending first heartbeat and transitioning to stable + assertHeartbeat(heartbeatRequestManager); + assertFalse(heartbeatRequestManager.pollTimer().isExpired()); + // Expires the poll timer, and ensure heartbeat is not sent + time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS); + assertNoHeartbeat(heartbeatRequestManager); Review Comment: As I wrote before, I think the consumer should leave the group. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } + /** + * When consumer polls, we need to reset the pollTimer. If member is already leaving the group + */ + public void ack() { + pollTimer.reset(rebalanceTimeoutMs); + } + + Timer pollTimer() { Review Comment: See my comment above about directly passing the poll timer into the constructor. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -166,11 +175,23 @@ public HeartbeatRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { - if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) { + if (!coordinatorRequestManager.coordinator().isPresent() || + membershipManager.shouldSkipHeartbeat() || + pollTimer.isExpired()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } + pollTimer.update(currentTimeMs); + if (pollTimer.isExpired()) { + logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + + "was longer than the configured max.poll.interval.ms, which typically implies that " + + "the poll loop is spending too much time processing messages. You can address this " + + "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + + "returned in poll() with max.poll.records."); + return PollResult.EMPTY; Review Comment: What @lianetm writes makes sense to me. Looking at the [code of the legacy consumer](https://github.com/apache/kafka/blob/bbcf40ad0dc7739d803244bbbfbaa4598850344b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1491) the consumer should proactively leave the group and not stop heartbeating and wait until it is kicked out of the group. However, if `group.instance.id` is set, it should not leave the group and stop heartbeating. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } + /** + * When consumer polls, we need to reset the pollTimer. If member is already leaving the group + */ + public void ack() { Review Comment: +1 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; + /** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: Isn't this only true for when `group.instance.id` is set? That is my interpretation of the following doc: https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org