dajac commented on code in PR #14017: URL: https://github.com/apache/kafka/pull/14017#discussion_r1269398879
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); + + } else if (group.isInState(EMPTY)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + + } else if (group.isInState(PREPARING_REBALANCE)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + + } else if (group.isInState(COMPLETING_REBALANCE)) { + group.member(memberId).setAwaitingSyncFuture(responseFuture); + removePendingSyncMember(group, request.memberId()); + + // If this is the leader, then we can attempt to persist state and transition to stable + if (group.isLeader(memberId)) { + log.info("Assignment received from leader {} for group {} for generation {}. " + + "The group has {} members, {} of which are static.", + memberId, groupId, group.generationId(), + group.size(), group.allStaticMemberIds().size()); + + // Fill all members with corresponding assignment. Reset members not specified in + // the assignment to empty assignments. + Map<String, byte[]> assignments = new HashMap<>(); + request.assignments() + .forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + + Set<String> membersWithMissingAssignment = new HashSet<>(); + group.allMembers().forEach(member -> { + byte[] assignment = assignments.get(member.memberId()); + if (assignment != null) { + member.setAssignment(assignment); + } else { + membersWithMissingAssignment.add(member.memberId()); + member.setAssignment(new byte[0]); + } + }); Review Comment: I am not sure. I lean towards keeping the implementation as it was to avoid any unwanted side effects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org