cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1562500382


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest(final long curr
         heartbeatRequestState.onSendAttempt(currentTimeMs);
         membershipManager.onHeartbeatRequestSent();
         metricsManager.recordHeartbeatSentMs(currentTimeMs);
+        // Reset timer when sending the request, to make sure that, if waiting 
for the interval,
+        // we don't include the response time (which may introduce delay)

Review Comment:
   Do we really need this comment?
   Additionally, I could not find a verification of this call in unit tests. 
Since you added a comment it seems to be important enough for a verification. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -231,6 +231,34 @@ public void testTimerNotDue() {
         assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
     }
 
+    @Test
+    public void testHeartbeatNotSentIfAnotherOnInFlight() {
+        mockStableMember();
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+        // Heartbeat sent (no response received)
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+        NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
+            "previous on in-flight");
+        
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent when the " +
+            "interval expires if there is a previous HB request in-flight");
+
+        // Receive response for the inflight. The next HB should be sent on 
the next poll after
+        // the interval expires.
+        inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+        time.sleep(DEFAULT_RETRY_BACKOFF_MS);
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+

Review Comment:
   ```suggestion
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -231,6 +231,34 @@ public void testTimerNotDue() {
         assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
     }
 
+    @Test
+    public void testHeartbeatNotSentIfAnotherOnInFlight() {

Review Comment:
   typo
   ```suggestion
       public void testHeartbeatNotSentIfAnotherOneInFlight() {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) {
             return heartbeatTimer.remainingMs();
         }
 
+        public void onFailedAttempt(final long currentTimeMs) {
+            // Expire timer to allow sending HB after a failure. After a 
failure, a next HB may be
+            // needed with backoff (ex. errors that lead to retries, like 
coordinator load error),
+            // or immediately (ex. errors that lead to rejoining, like fencing 
errors).
+            heartbeatTimer.update(heartbeatTimer.currentTimeMs() + 
heartbeatTimer.remainingMs());

Review Comment:
   What about adding a method to `Timer` that expires the timer without 
updating it with a time point in the future. Alternatively, I think you could 
reset the `Timer` to 0 with `heartbeatTimer.reset(0)`.
   Do we need a verification in the unit tests for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to