cadonna commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1415377837


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -121,11 +127,13 @@ public HeartbeatRequestManager(
         this.heartbeatState = new HeartbeatState(subscriptions, 
membershipManager, rebalanceTimeoutMs);
         this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
             retryBackoffMaxMs, rebalanceTimeoutMs);
+        this.pollTimer = time.timer(rebalanceTimeoutMs);
     }
 
     // Visible for testing
     HeartbeatRequestManager(
         final LogContext logContext,
+        final Time time,

Review Comment:
   Why not passing in directly the poll timer instead of the mock time that is 
only used to create the poll timer. By passing in the poll timer, you can also 
remove method `pollTimer()` since that is only needed for testing.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -121,11 +127,13 @@ public HeartbeatRequestManager(
         this.heartbeatState = new HeartbeatState(subscriptions, 
membershipManager, rebalanceTimeoutMs);
         this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
             retryBackoffMaxMs, rebalanceTimeoutMs);
+        this.pollTimer = time.timer(rebalanceTimeoutMs);

Review Comment:
   Yes, please, rename.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
             new ConsumerGroupHeartbeatResponseData.Assignment();
         
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
         ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-                .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-                .setMemberId(memberId)
-                .setMemberEpoch(1)
-                .setAssignment(assignmentTopic1));
+            .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+            .setMemberId(memberId)
+            .setMemberEpoch(1)
+            .setAssignment(assignmentTopic1));
         membershipManager.onHeartbeatResponseReceived(rs1.data());
         assertEquals(MemberState.RECONCILING, membershipManager.state());
     }
 
+    @Test
+    public void testEnsureLeaveGroupWhenPollTimerExpires() {
+        membershipManager.transitionToJoining();
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        // Sending first heartbeat and transitioning to stable
+        assertHeartbeat(heartbeatRequestManager);
+        assertFalse(heartbeatRequestManager.pollTimer().isExpired());
+        // Expires the poll timer, and ensure heartbeat is not sent
+        time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
+        assertNoHeartbeat(heartbeatRequestManager);

Review Comment:
   As I wrote before, I think the consumer should leave the group.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
         return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
     }
 
+    /**
+     * When consumer polls, we need to reset the pollTimer.  If member is 
already leaving the group
+     */
+    public void ack() {
+        pollTimer.reset(rebalanceTimeoutMs);
+    }
+
+    Timer pollTimer() {

Review Comment:
   See my comment above about directly passing the poll timer into the 
constructor. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -166,11 +175,23 @@ public HeartbeatRequestManager(
      */
     @Override
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-        if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.shouldSkipHeartbeat()) {
+        if (!coordinatorRequestManager.coordinator().isPresent() ||
+            membershipManager.shouldSkipHeartbeat() ||
+            pollTimer.isExpired()) {
             membershipManager.onHeartbeatRequestSkipped();
             return NetworkClientDelegate.PollResult.EMPTY;
         }
 
+        pollTimer.update(currentTimeMs);
+        if (pollTimer.isExpired()) {
+            logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
+                "was longer than the configured max.poll.interval.ms, which 
typically implies that " +
+                "the poll loop is spending too much time processing messages. 
You can address this " +
+                "either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
+                "returned in poll() with max.poll.records.");
+            return PollResult.EMPTY;

Review Comment:
   What @lianetm writes makes sense to me. Looking at the [code of the legacy 
consumer](https://github.com/apache/kafka/blob/bbcf40ad0dc7739d803244bbbfbaa4598850344b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1491)
 the consumer should proactively leave the group and not stop heartbeating and 
wait until it is kicked out of the group.
   However, if `group.instance.id` is set, it should not leave the group and 
stop heartbeating. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
         return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
     }
 
+    /**
+     * When consumer polls, we need to reset the pollTimer.  If member is 
already leaving the group
+     */
+    public void ack() {

Review Comment:
   +1



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
      */
     private final BackgroundEventHandler backgroundEventHandler;
 
+    /**
+     * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+     * sending heartbeat until the next poll.
+     */

Review Comment:
   Isn't this only true for when `group.instance.id` is set? That is my 
interpretation of the following doc: 
https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to