chickenchickenlove commented on code in PR #22245:
URL: https://github.com/apache/kafka/pull/22245#discussion_r3281593230


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4485,6 +4723,42 @@ private void replaceMember(
         ));
     }
 
+    /**
+     * Write records to replace the old member by the new member.
+     *
+     * @param records   The list of records to append to.
+     * @param group     The streams group.
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void replaceStreamsMember(
+            List<CoordinatorRecord> records,
+            StreamsGroup group,
+            StreamsGroupMember oldMember,
+            StreamsGroupMember newMember
+    ) {
+        String groupId = group.groupId();
+
+        // Remove the member without canceling its timers in case the change 
is reverted. If the
+        // change is not reverted, the group validation will fail and the 
timer will do nothing.
+        records.addAll(removeStreamsMember(groupId, oldMember.memberId()));
+
+        // Generate records.
+        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(
+                groupId,
+                newMember
+        ));
+        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+                groupId,
+                newMember.memberId(),
+                group.targetAssignment(oldMember.memberId(), 
oldMember.instanceId())

Review Comment:
   @lucasbru Good point!
   
   That makes sense for `replaceStreamsMember`. In that method, 
`oldMember.memberId()` is already the exact target-assignment key, so passing 
`Optional.empty()` is clearer and avoids the extra static-member lookup.
   
   However, I’m less sure that the same change is safe in 
`fromLastTargetAssignment`, though. 
   In the static-member replacement path, the member passed there can be the 
replacement member with the new member id, while the in-memory target 
assignment may still be keyed by the old member id because the replacement 
records have not been replayed yet. 
   
   In that case, the instance-id lookup is what lets us recover the previous 
assignment in the no-recompute path. Does that match your understanding, or am 
I missing another invariant here?
   
   ```java
           if (!canComputeNextTargetAssignment) {
               ...
               return 
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
           }
   ```
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to