lucasbru commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1414200098


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
         return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
     }
 
+    /**
+     * When consumer polls, we need to reset the pollTimer.  If member is 
already leaving the group
+     */
+    public void ack() {

Review Comment:
   Is the timer up to date at this point? Otherwise, `deadlineMs` will be set 
incorrectly. Maybe pass in the current time into this function and update the 
timer before setting the timeout. Like we do for auto-commit.
   
   Also, better name please. `resetPollTimer` ?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -121,11 +127,13 @@ public HeartbeatRequestManager(
         this.heartbeatState = new HeartbeatState(subscriptions, 
membershipManager, rebalanceTimeoutMs);
         this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
             retryBackoffMaxMs, rebalanceTimeoutMs);
+        this.pollTimer = time.timer(rebalanceTimeoutMs);

Review Comment:
   rebalanceTimeoutMs == maxPollIntervalMs ? maybe rename in that case.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
             new ConsumerGroupHeartbeatResponseData.Assignment();
         
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
         ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-                .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-                .setMemberId(memberId)
-                .setMemberEpoch(1)
-                .setAssignment(assignmentTopic1));
+            .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+            .setMemberId(memberId)
+            .setMemberEpoch(1)
+            .setAssignment(assignmentTopic1));
         membershipManager.onHeartbeatResponseReceived(rs1.data());
         assertEquals(MemberState.RECONCILING, membershipManager.state());
     }
 
+    @Test
+    public void testEnsureLeaveGroupWhenPollTimerExpires() {
+        membershipManager.transitionToJoining();

Review Comment:
   You could consider using `membershipManager.transitionToFenced(); // And 
indirect way of moving to JOINING state` instead of changing the visibility of 
prod code



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -428,9 +428,9 @@ private void updateSubscription(Collection<TopicPartition> 
assignedPartitions,
      * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
      * try to join the group on the next heartbeat request. This is expected 
to be invoked when
      * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
-     * Visible for testing.

Review Comment:
   I think it's still visible for testing only



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
         return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
     }
 
+    /**
+     * When consumer polls, we need to reset the pollTimer.  If member is 
already leaving the group
+     */
+    public void ack() {
+        pollTimer.reset(rebalanceTimeoutMs);
+    }
+
+    Timer pollTimer() {

Review Comment:
   // visible for testing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to