vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383612141


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -898,31 +987,44 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
             group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
         }
 
-        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The
-        // delta between the existing and the new target assignment is 
persisted to the partition.
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
         int targetAssignmentEpoch = group.assignmentEpoch();
         Assignment targetAssignment = group.targetAssignment(memberId);
-        if (groupEpoch > targetAssignmentEpoch) {
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
             String preferredServerAssignor = 
group.computePreferredServerAssignor(
                 member,
                 updatedMember
             ).orElse(defaultAssignor.name());
 
             try {
-                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult =
-                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                TargetAssignmentBuilder assignmentResultBuilder =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor));
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member can
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced && groupEpoch == 
targetAssignmentEpoch) {
+                    targetAssignment = 
group.targetAssignment(existingStaticMember.memberId());
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(existingStaticMember.memberId())
+                        .addOrUpdateMember(memberId, updatedMember)
+                        .build();
+                    records.addAll(assignmentResult.records());

Review Comment:
   I have added members and target assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to