dajac commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1265008872


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -330,9 +330,31 @@ public CompletableFuture<SyncGroupResponseData> syncGroup(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        CompletableFuture<SyncGroupResponseData> responseFuture = new 
CompletableFuture<>();
+
+        if (!isGroupIdNotEmpty(request.groupId())) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.INVALID_GROUP_ID.code()));
+
+            return responseFuture;

Review Comment:
   How about using `CompletableFuture.completedFuture` here in order to be 
consistent with L330? It is a bit more concise. Then `responseFuture` could be 
declared afterwards.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));
+

Review Comment:
   nit: I would remove all those empty lines.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(

Review Comment:
   nit: javadoc?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));
+
+        } else if (group.isInState(EMPTY)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+        } else if (group.isInState(PREPARING_REBALANCE)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+        } else if (group.isInState(COMPLETING_REBALANCE)) {
+            group.member(memberId).setAwaitingSyncFuture(responseFuture);
+            removePendingSyncMember(group, request.memberId());
+
+            // If this is the leader, then we can attempt to persist state and 
transition to stable
+            if (group.isLeader(memberId)) {
+                log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+                    "The group has {} members, {} of which are static.",
+                    memberId, groupId, group.generationId(),
+                    group.size(), group.allStaticMemberIds().size());
+
+                // Fill all members with corresponding assignment. Reset 
members not specified in
+                // the assignment to empty assignments.
+                Map<String, byte[]> assignments = new HashMap<>();
+                request.assignments()
+                    .forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+                Set<String> membersWithMissingAssignment = new HashSet<>();
+                group.allMembers().forEach(member -> {
+                    byte[] assignment = assignments.get(member.memberId());
+                    if (assignment != null) {
+                        member.setAssignment(assignment);
+                    } else {
+                        membersWithMissingAssignment.add(member.memberId());
+                        member.setAssignment(new byte[0]);
+                    }
+                });

Review Comment:
   This is different from what we do in the scala code. There the assignments 
are set only when the write is committed. Could you explain the reasoning? Is 
this safe?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));
+
+        } else if (group.isInState(EMPTY)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+        } else if (group.isInState(PREPARING_REBALANCE)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+        } else if (group.isInState(COMPLETING_REBALANCE)) {
+            group.member(memberId).setAwaitingSyncFuture(responseFuture);
+            removePendingSyncMember(group, request.memberId());
+
+            // If this is the leader, then we can attempt to persist state and 
transition to stable
+            if (group.isLeader(memberId)) {
+                log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+                    "The group has {} members, {} of which are static.",
+                    memberId, groupId, group.generationId(),
+                    group.size(), group.allStaticMemberIds().size());
+
+                // Fill all members with corresponding assignment. Reset 
members not specified in
+                // the assignment to empty assignments.
+                Map<String, byte[]> assignments = new HashMap<>();
+                request.assignments()
+                    .forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+                Set<String> membersWithMissingAssignment = new HashSet<>();
+                group.allMembers().forEach(member -> {
+                    byte[] assignment = assignments.get(member.memberId());
+                    if (assignment != null) {
+                        member.setAssignment(assignment);
+                    } else {
+                        membersWithMissingAssignment.add(member.memberId());
+                        member.setAssignment(new byte[0]);
+                    }
+                });
+
+                if (!membersWithMissingAssignment.isEmpty()) {
+                    log.warn("Setting empty assignments for members {} of {} 
for generation {}.",
+                        membersWithMissingAssignment, groupId, 
group.generationId());
+                }
+
+                CompletableFuture<Void> appendFuture = new 
CompletableFuture<>();
+                appendFuture.whenComplete((__, t) -> {
+                    // Another member may have joined the group while we were 
awaiting this callback,
+                    // so we must ensure we are still in the 
CompletingRebalance state and the same generation
+                    // when it gets invoked. if we have transitioned to 
another state, then do nothing
+                    if (group.isInState(COMPLETING_REBALANCE) && 
request.generationId() == group.generationId()) {
+                        if (t != null) {
+                            Errors error = Errors.forException(t);
+                            resetAndPropagateAssignmentWithError(group, error);
+                            maybePrepareRebalanceOrCompleteJoin(group, "Error 
" + error + " when storing group assignment" +
+                                "during SyncGroup (member: " + memberId + 
").");
+                        } else {
+                            // Members' assignments were already updated. 
Propagate and transition to Stable.
+                            propagateAssignment(group, Errors.NONE);
+                            group.transitionTo(STABLE);
+                        }
+                    }
+                });
+
+                List<Record> records = Collections.singletonList(
+                    RecordHelpers.newGroupMetadataRecord(group, 
metadataImage.features().metadataVersion())
+                );
+                return new CoordinatorResult<>(records, appendFuture);
+            }
+
+        } else if (group.isInState(STABLE)) {
+            removePendingSyncMember(group, memberId);
+
+            // If the group is stable, we just return the current assignment
+            GenericGroupMember member = group.member(memberId);
+            responseFuture.complete(new SyncGroupResponseData()
+                .setProtocolType(group.protocolType().orElse(null))
+                .setProtocolName(group.protocolName().orElse(null))
+                .setAssignment(member.assignment())
+                .setErrorCode(Errors.NONE.code()));
+
+        } else if (group.isInState(DEAD)) {
+            throw new IllegalStateException("Reached unexpected condition for 
Dead group " + groupId);
+        }
+
         return EMPTY_RESULT;
     }
 
+    private Optional<Errors> validateSyncGroup(
+        GenericGroup group,
+        SyncGroupRequestData request
+    ) {
+        if (group.isInState(DEAD)) {
+            // If the group is marked as dead, it means some other thread has 
just removed the group
+            // from the coordinator metadata; this is likely that the group 
has migrated to some other
+            // coordinator OR the group is in a transient unstable phase. Let 
the member retry
+            // finding the correct coordinator and rejoin.
+            return Optional.of(COORDINATOR_NOT_AVAILABLE);
+        } else {
+            Optional<Errors> memberError = validateExistingMember(
+                group,
+                request.memberId(),
+                request.groupInstanceId(),
+                "sync-group"
+            );
+            if (memberError.isPresent()) {
+                return memberError;
+            } else {
+                if (request.generationId() != group.generationId()) {
+                    return Optional.of(Errors.ILLEGAL_GENERATION);
+

Review Comment:
   nit: I would remove all those empty lines.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));
+
+        } else if (group.isInState(EMPTY)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+        } else if (group.isInState(PREPARING_REBALANCE)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+        } else if (group.isInState(COMPLETING_REBALANCE)) {
+            group.member(memberId).setAwaitingSyncFuture(responseFuture);
+            removePendingSyncMember(group, request.memberId());
+
+            // If this is the leader, then we can attempt to persist state and 
transition to stable
+            if (group.isLeader(memberId)) {
+                log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+                    "The group has {} members, {} of which are static.",
+                    memberId, groupId, group.generationId(),
+                    group.size(), group.allStaticMemberIds().size());
+
+                // Fill all members with corresponding assignment. Reset 
members not specified in
+                // the assignment to empty assignments.
+                Map<String, byte[]> assignments = new HashMap<>();
+                request.assignments()
+                    .forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+                Set<String> membersWithMissingAssignment = new HashSet<>();
+                group.allMembers().forEach(member -> {
+                    byte[] assignment = assignments.get(member.memberId());
+                    if (assignment != null) {
+                        member.setAssignment(assignment);
+                    } else {
+                        membersWithMissingAssignment.add(member.memberId());
+                        member.setAssignment(new byte[0]);
+                    }
+                });
+
+                if (!membersWithMissingAssignment.isEmpty()) {
+                    log.warn("Setting empty assignments for members {} of {} 
for generation {}.",
+                        membersWithMissingAssignment, groupId, 
group.generationId());
+                }
+
+                CompletableFuture<Void> appendFuture = new 
CompletableFuture<>();
+                appendFuture.whenComplete((__, t) -> {
+                    // Another member may have joined the group while we were 
awaiting this callback,
+                    // so we must ensure we are still in the 
CompletingRebalance state and the same generation
+                    // when it gets invoked. if we have transitioned to 
another state, then do nothing
+                    if (group.isInState(COMPLETING_REBALANCE) && 
request.generationId() == group.generationId()) {
+                        if (t != null) {
+                            Errors error = Errors.forException(t);
+                            resetAndPropagateAssignmentWithError(group, error);
+                            maybePrepareRebalanceOrCompleteJoin(group, "Error 
" + error + " when storing group assignment" +
+                                "during SyncGroup (member: " + memberId + 
").");
+                        } else {
+                            // Members' assignments were already updated. 
Propagate and transition to Stable.
+                            propagateAssignment(group, Errors.NONE);
+                            group.transitionTo(STABLE);
+                        }
+                    }
+                });
+
+                List<Record> records = Collections.singletonList(
+                    RecordHelpers.newGroupMetadataRecord(group, 
metadataImage.features().metadataVersion())
+                );
+                return new CoordinatorResult<>(records, appendFuture);
+            }
+
+        } else if (group.isInState(STABLE)) {
+            removePendingSyncMember(group, memberId);
+
+            // If the group is stable, we just return the current assignment
+            GenericGroupMember member = group.member(memberId);
+            responseFuture.complete(new SyncGroupResponseData()
+                .setProtocolType(group.protocolType().orElse(null))
+                .setProtocolName(group.protocolName().orElse(null))
+                .setAssignment(member.assignment())
+                .setErrorCode(Errors.NONE.code()));
+
+        } else if (group.isInState(DEAD)) {
+            throw new IllegalStateException("Reached unexpected condition for 
Dead group " + groupId);
+        }
+
         return EMPTY_RESULT;
     }
 
+    private Optional<Errors> validateSyncGroup(
+        GenericGroup group,
+        SyncGroupRequestData request
+    ) {
+        if (group.isInState(DEAD)) {
+            // If the group is marked as dead, it means some other thread has 
just removed the group
+            // from the coordinator metadata; this is likely that the group 
has migrated to some other
+            // coordinator OR the group is in a transient unstable phase. Let 
the member retry
+            // finding the correct coordinator and rejoin.
+            return Optional.of(COORDINATOR_NOT_AVAILABLE);
+        } else {
+            Optional<Errors> memberError = validateExistingMember(
+                group,
+                request.memberId(),
+                request.groupInstanceId(),
+                "sync-group"
+            );
+            if (memberError.isPresent()) {
+                return memberError;
+            } else {
+                if (request.generationId() != group.generationId()) {
+                    return Optional.of(Errors.ILLEGAL_GENERATION);
+
+                } else if (request.protocolType() != null &&
+                    group.protocolType().isPresent() &&
+                    !group.protocolType().get().equals(request.protocolType())
+                ) {

Review Comment:
   Would it make sense to define an helper method for this code? We could also 
reuse it for the next one.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -330,9 +330,31 @@ public CompletableFuture<SyncGroupResponseData> syncGroup(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        CompletableFuture<SyncGroupResponseData> responseFuture = new 
CompletableFuture<>();
+
+        if (!isGroupIdNotEmpty(request.groupId())) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.INVALID_GROUP_ID.code()));
+
+            return responseFuture;
+        }
+
+        runtime.scheduleWriteOperation("generic-group-sync",
+            topicPartitionFor(request.groupId()),
+            coordinator -> coordinator.genericGroupSync(context, request, 
responseFuture)
+        ).exceptionally(exception -> {
+            log.error("Request {} hit an unexpected exception: {}",
+                request, exception.getMessage());

Review Comment:
   As discussed in the other PR, I would remove this because we can also get 
expected errors here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));
+
+        } else if (group.isInState(EMPTY)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+        } else if (group.isInState(PREPARING_REBALANCE)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+        } else if (group.isInState(COMPLETING_REBALANCE)) {
+            group.member(memberId).setAwaitingSyncFuture(responseFuture);
+            removePendingSyncMember(group, request.memberId());
+
+            // If this is the leader, then we can attempt to persist state and 
transition to stable
+            if (group.isLeader(memberId)) {
+                log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+                    "The group has {} members, {} of which are static.",
+                    memberId, groupId, group.generationId(),
+                    group.size(), group.allStaticMemberIds().size());

Review Comment:
   nit: It seems that those two would fit on the previous line.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));

Review Comment:
   It is interesting to not that we use two different ways to propagate the 
errors here. `getOrMaybeCreateGenericGroup` throws an exception that is 
propagated to the write operation whereas we complete the write future based on 
the result of `validateSyncGroup`. I was wondering whether we should try to be 
consistent wrt. how we handle errors. What's your thoughts on this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));
+
+        } else if (group.isInState(EMPTY)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+        } else if (group.isInState(PREPARING_REBALANCE)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+        } else if (group.isInState(COMPLETING_REBALANCE)) {
+            group.member(memberId).setAwaitingSyncFuture(responseFuture);
+            removePendingSyncMember(group, request.memberId());
+
+            // If this is the leader, then we can attempt to persist state and 
transition to stable
+            if (group.isLeader(memberId)) {
+                log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+                    "The group has {} members, {} of which are static.",
+                    memberId, groupId, group.generationId(),
+                    group.size(), group.allStaticMemberIds().size());
+
+                // Fill all members with corresponding assignment. Reset 
members not specified in
+                // the assignment to empty assignments.
+                Map<String, byte[]> assignments = new HashMap<>();
+                request.assignments()
+                    .forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+                Set<String> membersWithMissingAssignment = new HashSet<>();
+                group.allMembers().forEach(member -> {
+                    byte[] assignment = assignments.get(member.memberId());
+                    if (assignment != null) {
+                        member.setAssignment(assignment);
+                    } else {
+                        membersWithMissingAssignment.add(member.memberId());
+                        member.setAssignment(new byte[0]);
+                    }
+                });
+
+                if (!membersWithMissingAssignment.isEmpty()) {
+                    log.warn("Setting empty assignments for members {} of {} 
for generation {}.",
+                        membersWithMissingAssignment, groupId, 
group.generationId());
+                }
+
+                CompletableFuture<Void> appendFuture = new 
CompletableFuture<>();
+                appendFuture.whenComplete((__, t) -> {
+                    // Another member may have joined the group while we were 
awaiting this callback,
+                    // so we must ensure we are still in the 
CompletingRebalance state and the same generation
+                    // when it gets invoked. if we have transitioned to 
another state, then do nothing
+                    if (group.isInState(COMPLETING_REBALANCE) && 
request.generationId() == group.generationId()) {
+                        if (t != null) {
+                            Errors error = Errors.forException(t);
+                            resetAndPropagateAssignmentWithError(group, error);
+                            maybePrepareRebalanceOrCompleteJoin(group, "Error 
" + error + " when storing group assignment" +
+                                "during SyncGroup (member: " + memberId + 
").");
+                        } else {
+                            // Members' assignments were already updated. 
Propagate and transition to Stable.
+                            propagateAssignment(group, Errors.NONE);
+                            group.transitionTo(STABLE);
+                        }
+                    }
+                });
+
+                List<Record> records = Collections.singletonList(
+                    RecordHelpers.newGroupMetadataRecord(group, 
metadataImage.features().metadataVersion())
+                );
+                return new CoordinatorResult<>(records, appendFuture);
+            }
+
+        } else if (group.isInState(STABLE)) {
+            removePendingSyncMember(group, memberId);
+
+            // If the group is stable, we just return the current assignment
+            GenericGroupMember member = group.member(memberId);
+            responseFuture.complete(new SyncGroupResponseData()
+                .setProtocolType(group.protocolType().orElse(null))
+                .setProtocolName(group.protocolName().orElse(null))
+                .setAssignment(member.assignment())
+                .setErrorCode(Errors.NONE.code()));
+
+        } else if (group.isInState(DEAD)) {
+            throw new IllegalStateException("Reached unexpected condition for 
Dead group " + groupId);
+        }
+
         return EMPTY_RESULT;
     }
 
+    private Optional<Errors> validateSyncGroup(
+        GenericGroup group,
+        SyncGroupRequestData request
+    ) {
+        if (group.isInState(DEAD)) {
+            // If the group is marked as dead, it means some other thread has 
just removed the group
+            // from the coordinator metadata; this is likely that the group 
has migrated to some other
+            // coordinator OR the group is in a transient unstable phase. Let 
the member retry
+            // finding the correct coordinator and rejoin.
+            return Optional.of(COORDINATOR_NOT_AVAILABLE);
+        } else {
+            Optional<Errors> memberError = validateExistingMember(
+                group,
+                request.memberId(),
+                request.groupInstanceId(),
+                "sync-group"
+            );
+            if (memberError.isPresent()) {
+                return memberError;
+            } else {
+                if (request.generationId() != group.generationId()) {
+                    return Optional.of(Errors.ILLEGAL_GENERATION);
+
+                } else if (request.protocolType() != null &&
+                    group.protocolType().isPresent() &&
+                    !group.protocolType().get().equals(request.protocolType())
+                ) {
+                    return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL);
+
+                } else if (request.protocolName() != null &&
+                    group.protocolName().isPresent() &&
+                    !group.protocolName().get().equals(request.protocolName())
+                ) {
+                    return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL);
+
+                } else {
+                    return Optional.empty();
+                }
+            }
+        }
+    }
+
+    private void removePendingSyncMember(
+        GenericGroup group,
+        String memberId
+    ) {
+        group.removePendingSyncMember(memberId);
+
+        String syncKey = syncKey(group.groupId());
+
+        if (group.generationId() != group.pendingSyncGenerationId()) {
+            timer.cancel(syncKey);
+        } else {
+            if (group.isInState(DEAD) ||

Review Comment:
   Have you considered using a switch? That would be cleaner.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));
+
+        } else if (group.isInState(EMPTY)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+        } else if (group.isInState(PREPARING_REBALANCE)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+        } else if (group.isInState(COMPLETING_REBALANCE)) {
+            group.member(memberId).setAwaitingSyncFuture(responseFuture);
+            removePendingSyncMember(group, request.memberId());
+
+            // If this is the leader, then we can attempt to persist state and 
transition to stable
+            if (group.isLeader(memberId)) {
+                log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+                    "The group has {} members, {} of which are static.",
+                    memberId, groupId, group.generationId(),
+                    group.size(), group.allStaticMemberIds().size());
+
+                // Fill all members with corresponding assignment. Reset 
members not specified in
+                // the assignment to empty assignments.
+                Map<String, byte[]> assignments = new HashMap<>();
+                request.assignments()
+                    .forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+                Set<String> membersWithMissingAssignment = new HashSet<>();
+                group.allMembers().forEach(member -> {
+                    byte[] assignment = assignments.get(member.memberId());
+                    if (assignment != null) {
+                        member.setAssignment(assignment);
+                    } else {
+                        membersWithMissingAssignment.add(member.memberId());
+                        member.setAssignment(new byte[0]);
+                    }
+                });
+
+                if (!membersWithMissingAssignment.isEmpty()) {
+                    log.warn("Setting empty assignments for members {} of {} 
for generation {}.",
+                        membersWithMissingAssignment, groupId, 
group.generationId());
+                }
+
+                CompletableFuture<Void> appendFuture = new 
CompletableFuture<>();
+                appendFuture.whenComplete((__, t) -> {
+                    // Another member may have joined the group while we were 
awaiting this callback,
+                    // so we must ensure we are still in the 
CompletingRebalance state and the same generation
+                    // when it gets invoked. if we have transitioned to 
another state, then do nothing
+                    if (group.isInState(COMPLETING_REBALANCE) && 
request.generationId() == group.generationId()) {
+                        if (t != null) {
+                            Errors error = Errors.forException(t);
+                            resetAndPropagateAssignmentWithError(group, error);
+                            maybePrepareRebalanceOrCompleteJoin(group, "Error 
" + error + " when storing group assignment" +
+                                "during SyncGroup (member: " + memberId + 
").");
+                        } else {
+                            // Members' assignments were already updated. 
Propagate and transition to Stable.
+                            propagateAssignment(group, Errors.NONE);
+                            group.transitionTo(STABLE);
+                        }
+                    }
+                });
+
+                List<Record> records = Collections.singletonList(
+                    RecordHelpers.newGroupMetadataRecord(group, 
metadataImage.features().metadataVersion())
+                );
+                return new CoordinatorResult<>(records, appendFuture);
+            }
+
+        } else if (group.isInState(STABLE)) {
+            removePendingSyncMember(group, memberId);
+
+            // If the group is stable, we just return the current assignment
+            GenericGroupMember member = group.member(memberId);
+            responseFuture.complete(new SyncGroupResponseData()
+                .setProtocolType(group.protocolType().orElse(null))
+                .setProtocolName(group.protocolName().orElse(null))
+                .setAssignment(member.assignment())
+                .setErrorCode(Errors.NONE.code()));
+
+        } else if (group.isInState(DEAD)) {
+            throw new IllegalStateException("Reached unexpected condition for 
Dead group " + groupId);
+        }
+
         return EMPTY_RESULT;
     }
 
+    private Optional<Errors> validateSyncGroup(
+        GenericGroup group,
+        SyncGroupRequestData request
+    ) {
+        if (group.isInState(DEAD)) {
+            // If the group is marked as dead, it means some other thread has 
just removed the group
+            // from the coordinator metadata; this is likely that the group 
has migrated to some other
+            // coordinator OR the group is in a transient unstable phase. Let 
the member retry
+            // finding the correct coordinator and rejoin.
+            return Optional.of(COORDINATOR_NOT_AVAILABLE);
+        } else {
+            Optional<Errors> memberError = validateExistingMember(
+                group,
+                request.memberId(),
+                request.groupInstanceId(),
+                "sync-group"
+            );
+            if (memberError.isPresent()) {
+                return memberError;
+            } else {
+                if (request.generationId() != group.generationId()) {
+                    return Optional.of(Errors.ILLEGAL_GENERATION);
+
+                } else if (request.protocolType() != null &&
+                    group.protocolType().isPresent() &&
+                    !group.protocolType().get().equals(request.protocolType())
+                ) {
+                    return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL);
+
+                } else if (request.protocolName() != null &&
+                    group.protocolName().isPresent() &&
+                    !group.protocolName().get().equals(request.protocolName())
+                ) {
+                    return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL);
+
+                } else {
+                    return Optional.empty();
+                }
+            }
+        }
+    }
+
+    private void removePendingSyncMember(
+        GenericGroup group,
+        String memberId
+    ) {
+        group.removePendingSyncMember(memberId);
+
+        String syncKey = syncKey(group.groupId());
+
+        if (group.generationId() != group.pendingSyncGenerationId()) {

Review Comment:
   Is this required here? I think that this is required when the timeout 
expires because we want to ensure that the stale timer has no impact. However 
it does not seem to be required here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberAndRebalance(
                 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
         }
+        return maybeCompleteJoinPhase(group);
+    }
+
+    public CoordinatorResult<Void, Record> genericGroupSync(
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+        Optional<Errors> errorOpt = validateSyncGroup(group, request);
+        if (errorOpt.isPresent()) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(errorOpt.get().code()));
+
+        } else if (group.isInState(EMPTY)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+        } else if (group.isInState(PREPARING_REBALANCE)) {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+        } else if (group.isInState(COMPLETING_REBALANCE)) {
+            group.member(memberId).setAwaitingSyncFuture(responseFuture);
+            removePendingSyncMember(group, request.memberId());
+
+            // If this is the leader, then we can attempt to persist state and 
transition to stable
+            if (group.isLeader(memberId)) {
+                log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+                    "The group has {} members, {} of which are static.",
+                    memberId, groupId, group.generationId(),
+                    group.size(), group.allStaticMemberIds().size());
+
+                // Fill all members with corresponding assignment. Reset 
members not specified in
+                // the assignment to empty assignments.
+                Map<String, byte[]> assignments = new HashMap<>();
+                request.assignments()
+                    .forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));

Review Comment:
   nit: We would usually write it as follow:
   ```
   request.assignments().forEach(assignment ->
      assignments.put(assignment.memberId(), assignment.assignment())
   );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to