kirktrue commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1567992301
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -482,6 +482,15 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } + public void onFailedAttempt(final long currentTimeMs) { + // Reset timer to allow sending HB after a failure without waiting for the interval. + // After a failure, a next HB may be needed with backoff (ex. errors that lead to + // retries, like coordinator load error), or immediately (ex. errors that lead to + // rejoining, like fencing errors). + heartbeatTimer.reset(0); + super.onFailedAttempt(currentTimeMs); Review Comment: Does the decision to reset the heartbeat timer depend on what _type_ of error is received? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -380,7 +380,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, break; case UNRELEASED_INSTANCE_ID: - logger.error("GroupHeartbeatRequest failed due to the instance id {} was not released: {}", + logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}", Review Comment: QQ: are these logging changes of the ‘I'll just clean this up as long as I'm in here?’ variety, or dow it have some bearing on the correctness of the logs? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -231,6 +231,35 @@ public void testTimerNotDue() { assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); } + @Test + public void testHeartbeatNotSentIfAnotherOneInFlight() { + mockStableMember(); + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + + // Heartbeat sent (no response received) + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + + "previous on in-flight"); Review Comment: Super nit-picky, sorry 😞 ```suggestion "previous one in-flight"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org