jeffkbkim commented on code in PR #14017: URL: https://github.com/apache/kafka/pull/14017#discussion_r1267462476
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); + + } else if (group.isInState(EMPTY)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + + } else if (group.isInState(PREPARING_REBALANCE)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + + } else if (group.isInState(COMPLETING_REBALANCE)) { + group.member(memberId).setAwaitingSyncFuture(responseFuture); + removePendingSyncMember(group, request.memberId()); + + // If this is the leader, then we can attempt to persist state and transition to stable + if (group.isLeader(memberId)) { + log.info("Assignment received from leader {} for group {} for generation {}. " + + "The group has {} members, {} of which are static.", + memberId, groupId, group.generationId(), + group.size(), group.allStaticMemberIds().size()); + + // Fill all members with corresponding assignment. Reset members not specified in + // the assignment to empty assignments. + Map<String, byte[]> assignments = new HashMap<>(); + request.assignments() + .forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + + Set<String> membersWithMissingAssignment = new HashSet<>(); + group.allMembers().forEach(member -> { + byte[] assignment = assignments.get(member.memberId()); + if (assignment != null) { + member.setAssignment(assignment); + } else { + membersWithMissingAssignment.add(member.memberId()); + member.setAssignment(new byte[0]); + } + }); + + if (!membersWithMissingAssignment.isEmpty()) { + log.warn("Setting empty assignments for members {} of {} for generation {}.", + membersWithMissingAssignment, groupId, group.generationId()); + } + + CompletableFuture<Void> appendFuture = new CompletableFuture<>(); + appendFuture.whenComplete((__, t) -> { + // Another member may have joined the group while we were awaiting this callback, + // so we must ensure we are still in the CompletingRebalance state and the same generation + // when it gets invoked. if we have transitioned to another state, then do nothing + if (group.isInState(COMPLETING_REBALANCE) && request.generationId() == group.generationId()) { + if (t != null) { + Errors error = Errors.forException(t); + resetAndPropagateAssignmentWithError(group, error); + maybePrepareRebalanceOrCompleteJoin(group, "Error " + error + " when storing group assignment" + + "during SyncGroup (member: " + memberId + ")."); + } else { + // Members' assignments were already updated. Propagate and transition to Stable. + propagateAssignment(group, Errors.NONE); + group.transitionTo(STABLE); + } + } + }); + + List<Record> records = Collections.singletonList( + RecordHelpers.newGroupMetadataRecord(group, metadataImage.features().metadataVersion()) + ); + return new CoordinatorResult<>(records, appendFuture); + } + + } else if (group.isInState(STABLE)) { + removePendingSyncMember(group, memberId); + + // If the group is stable, we just return the current assignment + GenericGroupMember member = group.member(memberId); + responseFuture.complete(new SyncGroupResponseData() + .setProtocolType(group.protocolType().orElse(null)) + .setProtocolName(group.protocolName().orElse(null)) + .setAssignment(member.assignment()) + .setErrorCode(Errors.NONE.code())); + + } else if (group.isInState(DEAD)) { + throw new IllegalStateException("Reached unexpected condition for Dead group " + groupId); + } + return EMPTY_RESULT; } + private Optional<Errors> validateSyncGroup( + GenericGroup group, + SyncGroupRequestData request + ) { + if (group.isInState(DEAD)) { + // If the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the member retry + // finding the correct coordinator and rejoin. + return Optional.of(COORDINATOR_NOT_AVAILABLE); + } else { + Optional<Errors> memberError = validateExistingMember( + group, + request.memberId(), + request.groupInstanceId(), + "sync-group" + ); + if (memberError.isPresent()) { + return memberError; + } else { + if (request.generationId() != group.generationId()) { + return Optional.of(Errors.ILLEGAL_GENERATION); + + } else if (request.protocolType() != null && + group.protocolType().isPresent() && + !group.protocolType().get().equals(request.protocolType()) + ) { + return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL); + + } else if (request.protocolName() != null && + group.protocolName().isPresent() && + !group.protocolName().get().equals(request.protocolName()) + ) { + return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL); + + } else { + return Optional.empty(); + } + } + } + } + + private void removePendingSyncMember( + GenericGroup group, + String memberId + ) { + group.removePendingSyncMember(memberId); + + String syncKey = syncKey(group.groupId()); + + if (group.generationId() != group.pendingSyncGenerationId()) { Review Comment: So that i understand: let's say that there is a pending sync with an invalid generation id that expired. Since GroupMetadataManager#expirePendingSync already checks and ignores stale generation ids, we don't need to check & cancel it here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org