ableegoldman commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r2029654165
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java:
##########
@@ -211,6 +214,29 @@ public ConsumerMembershipManager membershipManager() {
return membershipManager;
}
+ @Override
+ protected boolean shouldSendLeaveHeartbeatNow() {
+ // If the consumer has dynamic membership,
+ // we should skip the leaving heartbeat when leaveGroupOperation is
REMAIN_IN_GROUP
+ if (membershipManager.groupInstanceId().isEmpty() && REMAIN_IN_GROUP
== membershipManager.leaveGroupOperation())
+ return false;
+ return membershipManager().state() == MemberState.LEAVING;
+ }
+
+ @Override
+ public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
+ // Determine if we should send a leaving heartbeat:
+ // - For static membership (when groupInstanceId is present): Always
send the leaving heartbeat
+ // - For dynamic membership: Send the leaving heartbeat only when
leaveGroupOperation is not REMAIN_IN_GROUP
+ boolean shouldHeartbeat =
membershipManager.groupInstanceId().isPresent()
Review Comment:
Also from a quick look at the ConsumerMembershipManager#isLeavingGroup`
implementation, it already includes checks on both of these conditions
(`groupInstanceId.isPresent` and `REMAIN_IN_GROUP ==
membershipManager.leaveGroupOperation()`). imo it would be cleaner and easier
to read if we just encapsulated all the logic around these conditions inside
the `#isLeavingGroup` method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]