lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1525912448


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
             // MemberEpoch - always sent
             data.setMemberEpoch(membershipManager.memberEpoch());
 
-            // InstanceId - only sent if has changed since the last heartbeat
-            // Always send when leaving the group as a static member
-            membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-                if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-                    data.setInstanceId(groupInstanceId);
-                    sentFields.instanceId = groupInstanceId;
-                }
-            });
+            // InstanceId - always send when leaving the group as a static 
member
+            membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
-            // RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-            if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+            // RebalanceTimeoutMs - only sent when joining
+            if (membershipManager.memberEpoch() == 0) {

Review Comment:
   Summary of slack discussion: We need to resend a full request after a 
failure, to let the GC know that it needs to send a full response (including 
repeating the assignment). The reason is that if you had a request timeout for 
a heartbeat request, that heartbeat could have been the one with a new 
assignment for you. If you don't send a full request next, the server will 
assume that you've receive that last one so it won't deliver the new assignment 
again. So sending a full request is an implicit "force full response". 
   
   I reverted that part of the changes (so I brought back the rebalanceMs and 
serverAssignor in sentFields). We could consider only using reset on the 
heartbeat state (resetting when transitioning to JOINING may be enough). 
However, that seems more brittle, it’s quite easy to introduce a code path 
where we forget to reset the heartbeat state. Having a contract like “always 
send when epoch==0” for the joining case is easier in my eyes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to