kirktrue commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1567992301


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -482,6 +482,15 @@ public long nextHeartbeatMs(final long currentTimeMs) {
             return heartbeatTimer.remainingMs();
         }
 
+        public void onFailedAttempt(final long currentTimeMs) {
+            // Reset timer to allow sending HB after a failure without waiting 
for the interval.
+            // After a failure, a next HB may be needed with backoff (ex. 
errors that lead to
+            // retries, like coordinator load error), or immediately (ex. 
errors that lead to
+            // rejoining, like fencing errors).
+            heartbeatTimer.reset(0);
+            super.onFailedAttempt(currentTimeMs);

Review Comment:
   Does the decision to reset the heartbeat timer depend on what _type_ of 
error is received?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -380,7 +380,7 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
                 break;
 
             case UNRELEASED_INSTANCE_ID:
-                logger.error("GroupHeartbeatRequest failed due to the instance 
id {} was not released: {}",
+                logger.error("GroupHeartbeatRequest failed due to unreleased 
instance id {}: {}",

Review Comment:
   QQ: are these logging changes of the ‘I'll just clean this up as long as I'm 
in here?’ variety, or dow it have some bearing on the correctness of the logs?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -231,6 +231,35 @@ public void testTimerNotDue() {
         assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
     }
 
+    @Test
+    public void testHeartbeatNotSentIfAnotherOneInFlight() {
+        mockStableMember();
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+        // Heartbeat sent (no response received)
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+        NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
+            "previous on in-flight");

Review Comment:
   Super nit-picky, sorry 😞 
   
   ```suggestion
               "previous one in-flight");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to