jeffkbkim commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1270937110


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));
+
+        } else if (group.isInState(EMPTY)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+        } else if (group.isInState(PREPARING_REBALANCE)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+        } else if (group.isInState(COMPLETING_REBALANCE)) {
+            group.member(memberId).setAwaitingSyncFuture(responseFuture);
+            removePendingSyncMember(group, request.memberId());
+
+            // If this is the leader, then we can attempt to persist state and 
transition to stable
+            if (group.isLeader(memberId)) {
+                log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+                    "The group has {} members, {} of which are static.",
+                    memberId, groupId, group.generationId(),
+                    group.size(), group.allStaticMemberIds().size());
+
+                // Fill all members with corresponding assignment. Reset 
members not specified in
+                // the assignment to empty assignments.
+                Map<String, byte[]> assignments = new HashMap<>();
+                request.assignments()
+                    .forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+                Set<String> membersWithMissingAssignment = new HashSet<>();
+                group.allMembers().forEach(member -> {
+                    byte[] assignment = assignments.get(member.memberId());
+                    if (assignment != null) {
+                        member.setAssignment(assignment);
+                    } else {
+                        membersWithMissingAssignment.add(member.memberId());
+                        member.setAssignment(new byte[0]);
+                    }
+                });

Review Comment:
   this was reverted to reflect the existing behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to