lucasbru commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1414200098
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
return heartbeatNow ? 0L :
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
}
+ /**
+ * When consumer polls, we need to reset the pollTimer. If member is
already leaving the group
+ */
+ public void ack() {
Review Comment:
Is the timer up to date at this point? Otherwise, `deadlineMs` will be set
incorrectly. Maybe pass in the current time into this function and update the
timer before setting the timeout. Like we do for auto-commit.
Also, better name please. `resetPollTimer` ?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -121,11 +127,13 @@ public HeartbeatRequestManager(
this.heartbeatState = new HeartbeatState(subscriptions,
membershipManager, rebalanceTimeoutMs);
this.heartbeatRequestState = new HeartbeatRequestState(logContext,
time, 0, retryBackoffMs,
retryBackoffMaxMs, rebalanceTimeoutMs);
+ this.pollTimer = time.timer(rebalanceTimeoutMs);
Review Comment:
rebalanceTimeoutMs == maxPollIntervalMs ? maybe rename in that case.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
new ConsumerGroupHeartbeatResponseData.Assignment();
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
ConsumerGroupHeartbeatResponse rs1 = new
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
- .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
- .setMemberId(memberId)
- .setMemberEpoch(1)
- .setAssignment(assignmentTopic1));
+ .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setAssignment(assignmentTopic1));
membershipManager.onHeartbeatResponseReceived(rs1.data());
assertEquals(MemberState.RECONCILING, membershipManager.state());
}
+ @Test
+ public void testEnsureLeaveGroupWhenPollTimerExpires() {
+ membershipManager.transitionToJoining();
Review Comment:
You could consider using `membershipManager.transitionToFenced(); // And
indirect way of moving to JOINING state` instead of changing the visibility of
prod code
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -428,9 +428,9 @@ private void updateSubscription(Collection<TopicPartition>
assignedPartitions,
* Transition to the {@link MemberState#JOINING} state, indicating that
the member will
* try to join the group on the next heartbeat request. This is expected
to be invoked when
* the user calls the subscribe API, or when the member wants to rejoin
after getting fenced.
- * Visible for testing.
Review Comment:
I think it's still visible for testing only
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
return heartbeatNow ? 0L :
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
}
+ /**
+ * When consumer polls, we need to reset the pollTimer. If member is
already leaving the group
+ */
+ public void ack() {
+ pollTimer.reset(rebalanceTimeoutMs);
+ }
+
+ Timer pollTimer() {
Review Comment:
// visible for testing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]