jeffkbkim commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1267462476


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));
+
+        } else if (group.isInState(EMPTY)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+        } else if (group.isInState(PREPARING_REBALANCE)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+        } else if (group.isInState(COMPLETING_REBALANCE)) {
+            group.member(memberId).setAwaitingSyncFuture(responseFuture);
+            removePendingSyncMember(group, request.memberId());
+
+            // If this is the leader, then we can attempt to persist state and 
transition to stable
+            if (group.isLeader(memberId)) {
+                log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+                    "The group has {} members, {} of which are static.",
+                    memberId, groupId, group.generationId(),
+                    group.size(), group.allStaticMemberIds().size());
+
+                // Fill all members with corresponding assignment. Reset 
members not specified in
+                // the assignment to empty assignments.
+                Map<String, byte[]> assignments = new HashMap<>();
+                request.assignments()
+                    .forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+                Set<String> membersWithMissingAssignment = new HashSet<>();
+                group.allMembers().forEach(member -> {
+                    byte[] assignment = assignments.get(member.memberId());
+                    if (assignment != null) {
+                        member.setAssignment(assignment);
+                    } else {
+                        membersWithMissingAssignment.add(member.memberId());
+                        member.setAssignment(new byte[0]);
+                    }
+                });
+
+                if (!membersWithMissingAssignment.isEmpty()) {
+                    log.warn("Setting empty assignments for members {} of {} 
for generation {}.",
+                        membersWithMissingAssignment, groupId, 
group.generationId());
+                }
+
+                CompletableFuture<Void> appendFuture = new 
CompletableFuture<>();
+                appendFuture.whenComplete((__, t) -> {
+                    // Another member may have joined the group while we were 
awaiting this callback,
+                    // so we must ensure we are still in the 
CompletingRebalance state and the same generation
+                    // when it gets invoked. if we have transitioned to 
another state, then do nothing
+                    if (group.isInState(COMPLETING_REBALANCE) && 
request.generationId() == group.generationId()) {
+                        if (t != null) {
+                            Errors error = Errors.forException(t);
+                            resetAndPropagateAssignmentWithError(group, error);
+                            maybePrepareRebalanceOrCompleteJoin(group, "Error 
" + error + " when storing group assignment" +
+                                "during SyncGroup (member: " + memberId + 
").");
+                        } else {
+                            // Members' assignments were already updated. 
Propagate and transition to Stable.
+                            propagateAssignment(group, Errors.NONE);
+                            group.transitionTo(STABLE);
+                        }
+                    }
+                });
+
+                List<Record> records = Collections.singletonList(
+                    RecordHelpers.newGroupMetadataRecord(group, 
metadataImage.features().metadataVersion())
+                );
+                return new CoordinatorResult<>(records, appendFuture);
+            }
+
+        } else if (group.isInState(STABLE)) {
+            removePendingSyncMember(group, memberId);
+
+            // If the group is stable, we just return the current assignment
+            GenericGroupMember member = group.member(memberId);
+            responseFuture.complete(new SyncGroupResponseData()
+                .setProtocolType(group.protocolType().orElse(null))
+                .setProtocolName(group.protocolName().orElse(null))
+                .setAssignment(member.assignment())
+                .setErrorCode(Errors.NONE.code()));
+
+        } else if (group.isInState(DEAD)) {
+            throw new IllegalStateException("Reached unexpected condition for 
Dead group " + groupId);
+        }
+
         return EMPTY_RESULT;
     }
 
+    private Optional<Errors> validateSyncGroup(
+        GenericGroup group,
+        SyncGroupRequestData request
+    ) {
+        if (group.isInState(DEAD)) {
+            // If the group is marked as dead, it means some other thread has 
just removed the group
+            // from the coordinator metadata; this is likely that the group 
has migrated to some other
+            // coordinator OR the group is in a transient unstable phase. Let 
the member retry
+            // finding the correct coordinator and rejoin.
+            return Optional.of(COORDINATOR_NOT_AVAILABLE);
+        } else {
+            Optional<Errors> memberError = validateExistingMember(
+                group,
+                request.memberId(),
+                request.groupInstanceId(),
+                "sync-group"
+            );
+            if (memberError.isPresent()) {
+                return memberError;
+            } else {
+                if (request.generationId() != group.generationId()) {
+                    return Optional.of(Errors.ILLEGAL_GENERATION);
+
+                } else if (request.protocolType() != null &&
+                    group.protocolType().isPresent() &&
+                    !group.protocolType().get().equals(request.protocolType())
+                ) {
+                    return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL);
+
+                } else if (request.protocolName() != null &&
+                    group.protocolName().isPresent() &&
+                    !group.protocolName().get().equals(request.protocolName())
+                ) {
+                    return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL);
+
+                } else {
+                    return Optional.empty();
+                }
+            }
+        }
+    }
+
+    private void removePendingSyncMember(
+        GenericGroup group,
+        String memberId
+    ) {
+        group.removePendingSyncMember(memberId);
+
+        String syncKey = syncKey(group.groupId());
+
+        if (group.generationId() != group.pendingSyncGenerationId()) {

Review Comment:
   So that i understand: let's say that there is a pending sync with an invalid 
generation id that expired. Since GroupMetadataManager#expirePendingSync 
already checks and ignores stale generation ids, we don't need to check & 
cancel it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to