AndrewJSchofield commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1490086839


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -731,10 +729,29 @@ private boolean targetAssignmentReconciled() {
         return currentAssignment.equals(currentTargetAssignment);
     }
 
+    /**
+     * @return True if the member should not send heartbeats, which would be 
one of the following
+     * cases:
+     * <ul>
+     * <li>Member is not subscribed to any topics</li>
+     * <li>Member has received a fatal error in a previous heartbeat 
response</li>
+     * <li>Member is stale, meaning that it has left the group due to expired 
poll timer</li>
+     * </ul>
+     */
     @Override
     public boolean shouldSkipHeartbeat() {
         MemberState state = state();
-        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL 
|| state == MemberState.STALE;
+    }
+
+    /**
+     * @return True if the member is preparing to leave the group (waiting for 
callbacks), or
+     * leaving (sending last heartbeat). This is used to skip proactively 
leaving the group when
+     * the consumer poll timer expires.
+     */
+    public boolean isLeavingGroup() {
+        MemberState state = state();
+        return state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING;
     }
 
     /**

Review Comment:
   I think that technically, it doesn't take the *user* to poll in order to 
rejoin. The code polls more frequently is the user's poll timeout is long 
enough.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -188,18 +188,18 @@ public HeartbeatRequestManager(
     @Override
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
         if (!coordinatorRequestManager.coordinator().isPresent() ||
-            membershipManager.shouldSkipHeartbeat() ||
-            pollTimer.isExpired()) {
+            membershipManager.shouldSkipHeartbeat()) {
             membershipManager.onHeartbeatRequestSkipped();
             return NetworkClientDelegate.PollResult.EMPTY;
         }
         pollTimer.update(currentTimeMs);
-        if (pollTimer.isExpired()) {
-            logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
-                "was longer than the configured max.poll.interval.ms, which 
typically implies that " +
-                "the poll loop is spending too much time processing messages. 
You can address this " +
-                "either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
-                "returned in poll() with max.poll.records.");
+        if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+            logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+                "subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+                "which typically implies that the poll loop is spending too 
much time processing " +
+                "messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+                "reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+

Review Comment:
   I think the logic as written is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to