vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1402215662


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -898,31 +1000,45 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
             group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
         }
 
-        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The
-        // delta between the existing and the new target assignment is 
persisted to the partition.
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
         int targetAssignmentEpoch = group.assignmentEpoch();
         Assignment targetAssignment = group.targetAssignment(memberId);
-        if (groupEpoch > targetAssignmentEpoch) {
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
             String preferredServerAssignor = 
group.computePreferredServerAssignor(
                 member,
                 updatedMember
             ).orElse(defaultAssignor.name());
 
             try {
-                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult =
-                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor));
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member can
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced && groupEpoch == 
targetAssignmentEpoch) {

Review Comment:
   Actually, `groupEpoch == targetAssignmentEpoch` is not needed. I was just 
trying to ensure that the group epoch and target member epoch are the same 
which is what will happen when static member is replaced. So, in a way it's 
redundant. I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to