dajac commented on code in PR #14017: URL: https://github.com/apache/kafka/pull/14017#discussion_r1265008872
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -330,9 +330,31 @@ public CompletableFuture<SyncGroupResponseData> syncGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + CompletableFuture<SyncGroupResponseData> responseFuture = new CompletableFuture<>(); + + if (!isGroupIdNotEmpty(request.groupId())) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code())); + + return responseFuture; Review Comment: How about using `CompletableFuture.completedFuture` here in order to be consistent with L330? It is a bit more concise. Then `responseFuture` could be declared afterwards. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); + Review Comment: nit: I would remove all those empty lines. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( Review Comment: nit: javadoc? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); + + } else if (group.isInState(EMPTY)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + + } else if (group.isInState(PREPARING_REBALANCE)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + + } else if (group.isInState(COMPLETING_REBALANCE)) { + group.member(memberId).setAwaitingSyncFuture(responseFuture); + removePendingSyncMember(group, request.memberId()); + + // If this is the leader, then we can attempt to persist state and transition to stable + if (group.isLeader(memberId)) { + log.info("Assignment received from leader {} for group {} for generation {}. " + + "The group has {} members, {} of which are static.", + memberId, groupId, group.generationId(), + group.size(), group.allStaticMemberIds().size()); + + // Fill all members with corresponding assignment. Reset members not specified in + // the assignment to empty assignments. + Map<String, byte[]> assignments = new HashMap<>(); + request.assignments() + .forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + + Set<String> membersWithMissingAssignment = new HashSet<>(); + group.allMembers().forEach(member -> { + byte[] assignment = assignments.get(member.memberId()); + if (assignment != null) { + member.setAssignment(assignment); + } else { + membersWithMissingAssignment.add(member.memberId()); + member.setAssignment(new byte[0]); + } + }); Review Comment: This is different from what we do in the scala code. There the assignments are set only when the write is committed. Could you explain the reasoning? Is this safe? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); + + } else if (group.isInState(EMPTY)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + + } else if (group.isInState(PREPARING_REBALANCE)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + + } else if (group.isInState(COMPLETING_REBALANCE)) { + group.member(memberId).setAwaitingSyncFuture(responseFuture); + removePendingSyncMember(group, request.memberId()); + + // If this is the leader, then we can attempt to persist state and transition to stable + if (group.isLeader(memberId)) { + log.info("Assignment received from leader {} for group {} for generation {}. " + + "The group has {} members, {} of which are static.", + memberId, groupId, group.generationId(), + group.size(), group.allStaticMemberIds().size()); + + // Fill all members with corresponding assignment. Reset members not specified in + // the assignment to empty assignments. + Map<String, byte[]> assignments = new HashMap<>(); + request.assignments() + .forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + + Set<String> membersWithMissingAssignment = new HashSet<>(); + group.allMembers().forEach(member -> { + byte[] assignment = assignments.get(member.memberId()); + if (assignment != null) { + member.setAssignment(assignment); + } else { + membersWithMissingAssignment.add(member.memberId()); + member.setAssignment(new byte[0]); + } + }); + + if (!membersWithMissingAssignment.isEmpty()) { + log.warn("Setting empty assignments for members {} of {} for generation {}.", + membersWithMissingAssignment, groupId, group.generationId()); + } + + CompletableFuture<Void> appendFuture = new CompletableFuture<>(); + appendFuture.whenComplete((__, t) -> { + // Another member may have joined the group while we were awaiting this callback, + // so we must ensure we are still in the CompletingRebalance state and the same generation + // when it gets invoked. if we have transitioned to another state, then do nothing + if (group.isInState(COMPLETING_REBALANCE) && request.generationId() == group.generationId()) { + if (t != null) { + Errors error = Errors.forException(t); + resetAndPropagateAssignmentWithError(group, error); + maybePrepareRebalanceOrCompleteJoin(group, "Error " + error + " when storing group assignment" + + "during SyncGroup (member: " + memberId + ")."); + } else { + // Members' assignments were already updated. Propagate and transition to Stable. + propagateAssignment(group, Errors.NONE); + group.transitionTo(STABLE); + } + } + }); + + List<Record> records = Collections.singletonList( + RecordHelpers.newGroupMetadataRecord(group, metadataImage.features().metadataVersion()) + ); + return new CoordinatorResult<>(records, appendFuture); + } + + } else if (group.isInState(STABLE)) { + removePendingSyncMember(group, memberId); + + // If the group is stable, we just return the current assignment + GenericGroupMember member = group.member(memberId); + responseFuture.complete(new SyncGroupResponseData() + .setProtocolType(group.protocolType().orElse(null)) + .setProtocolName(group.protocolName().orElse(null)) + .setAssignment(member.assignment()) + .setErrorCode(Errors.NONE.code())); + + } else if (group.isInState(DEAD)) { + throw new IllegalStateException("Reached unexpected condition for Dead group " + groupId); + } + return EMPTY_RESULT; } + private Optional<Errors> validateSyncGroup( + GenericGroup group, + SyncGroupRequestData request + ) { + if (group.isInState(DEAD)) { + // If the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the member retry + // finding the correct coordinator and rejoin. + return Optional.of(COORDINATOR_NOT_AVAILABLE); + } else { + Optional<Errors> memberError = validateExistingMember( + group, + request.memberId(), + request.groupInstanceId(), + "sync-group" + ); + if (memberError.isPresent()) { + return memberError; + } else { + if (request.generationId() != group.generationId()) { + return Optional.of(Errors.ILLEGAL_GENERATION); + Review Comment: nit: I would remove all those empty lines. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); + + } else if (group.isInState(EMPTY)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + + } else if (group.isInState(PREPARING_REBALANCE)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + + } else if (group.isInState(COMPLETING_REBALANCE)) { + group.member(memberId).setAwaitingSyncFuture(responseFuture); + removePendingSyncMember(group, request.memberId()); + + // If this is the leader, then we can attempt to persist state and transition to stable + if (group.isLeader(memberId)) { + log.info("Assignment received from leader {} for group {} for generation {}. " + + "The group has {} members, {} of which are static.", + memberId, groupId, group.generationId(), + group.size(), group.allStaticMemberIds().size()); + + // Fill all members with corresponding assignment. Reset members not specified in + // the assignment to empty assignments. + Map<String, byte[]> assignments = new HashMap<>(); + request.assignments() + .forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + + Set<String> membersWithMissingAssignment = new HashSet<>(); + group.allMembers().forEach(member -> { + byte[] assignment = assignments.get(member.memberId()); + if (assignment != null) { + member.setAssignment(assignment); + } else { + membersWithMissingAssignment.add(member.memberId()); + member.setAssignment(new byte[0]); + } + }); + + if (!membersWithMissingAssignment.isEmpty()) { + log.warn("Setting empty assignments for members {} of {} for generation {}.", + membersWithMissingAssignment, groupId, group.generationId()); + } + + CompletableFuture<Void> appendFuture = new CompletableFuture<>(); + appendFuture.whenComplete((__, t) -> { + // Another member may have joined the group while we were awaiting this callback, + // so we must ensure we are still in the CompletingRebalance state and the same generation + // when it gets invoked. if we have transitioned to another state, then do nothing + if (group.isInState(COMPLETING_REBALANCE) && request.generationId() == group.generationId()) { + if (t != null) { + Errors error = Errors.forException(t); + resetAndPropagateAssignmentWithError(group, error); + maybePrepareRebalanceOrCompleteJoin(group, "Error " + error + " when storing group assignment" + + "during SyncGroup (member: " + memberId + ")."); + } else { + // Members' assignments were already updated. Propagate and transition to Stable. + propagateAssignment(group, Errors.NONE); + group.transitionTo(STABLE); + } + } + }); + + List<Record> records = Collections.singletonList( + RecordHelpers.newGroupMetadataRecord(group, metadataImage.features().metadataVersion()) + ); + return new CoordinatorResult<>(records, appendFuture); + } + + } else if (group.isInState(STABLE)) { + removePendingSyncMember(group, memberId); + + // If the group is stable, we just return the current assignment + GenericGroupMember member = group.member(memberId); + responseFuture.complete(new SyncGroupResponseData() + .setProtocolType(group.protocolType().orElse(null)) + .setProtocolName(group.protocolName().orElse(null)) + .setAssignment(member.assignment()) + .setErrorCode(Errors.NONE.code())); + + } else if (group.isInState(DEAD)) { + throw new IllegalStateException("Reached unexpected condition for Dead group " + groupId); + } + return EMPTY_RESULT; } + private Optional<Errors> validateSyncGroup( + GenericGroup group, + SyncGroupRequestData request + ) { + if (group.isInState(DEAD)) { + // If the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the member retry + // finding the correct coordinator and rejoin. + return Optional.of(COORDINATOR_NOT_AVAILABLE); + } else { + Optional<Errors> memberError = validateExistingMember( + group, + request.memberId(), + request.groupInstanceId(), + "sync-group" + ); + if (memberError.isPresent()) { + return memberError; + } else { + if (request.generationId() != group.generationId()) { + return Optional.of(Errors.ILLEGAL_GENERATION); + + } else if (request.protocolType() != null && + group.protocolType().isPresent() && + !group.protocolType().get().equals(request.protocolType()) + ) { Review Comment: Would it make sense to define an helper method for this code? We could also reuse it for the next one. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -330,9 +330,31 @@ public CompletableFuture<SyncGroupResponseData> syncGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + CompletableFuture<SyncGroupResponseData> responseFuture = new CompletableFuture<>(); + + if (!isGroupIdNotEmpty(request.groupId())) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code())); + + return responseFuture; + } + + runtime.scheduleWriteOperation("generic-group-sync", + topicPartitionFor(request.groupId()), + coordinator -> coordinator.genericGroupSync(context, request, responseFuture) + ).exceptionally(exception -> { + log.error("Request {} hit an unexpected exception: {}", + request, exception.getMessage()); Review Comment: As discussed in the other PR, I would remove this because we can also get expected errors here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); + + } else if (group.isInState(EMPTY)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + + } else if (group.isInState(PREPARING_REBALANCE)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + + } else if (group.isInState(COMPLETING_REBALANCE)) { + group.member(memberId).setAwaitingSyncFuture(responseFuture); + removePendingSyncMember(group, request.memberId()); + + // If this is the leader, then we can attempt to persist state and transition to stable + if (group.isLeader(memberId)) { + log.info("Assignment received from leader {} for group {} for generation {}. " + + "The group has {} members, {} of which are static.", + memberId, groupId, group.generationId(), + group.size(), group.allStaticMemberIds().size()); Review Comment: nit: It seems that those two would fit on the previous line. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); Review Comment: It is interesting to not that we use two different ways to propagate the errors here. `getOrMaybeCreateGenericGroup` throws an exception that is propagated to the write operation whereas we complete the write future based on the result of `validateSyncGroup`. I was wondering whether we should try to be consistent wrt. how we handle errors. What's your thoughts on this? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); + + } else if (group.isInState(EMPTY)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + + } else if (group.isInState(PREPARING_REBALANCE)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + + } else if (group.isInState(COMPLETING_REBALANCE)) { + group.member(memberId).setAwaitingSyncFuture(responseFuture); + removePendingSyncMember(group, request.memberId()); + + // If this is the leader, then we can attempt to persist state and transition to stable + if (group.isLeader(memberId)) { + log.info("Assignment received from leader {} for group {} for generation {}. " + + "The group has {} members, {} of which are static.", + memberId, groupId, group.generationId(), + group.size(), group.allStaticMemberIds().size()); + + // Fill all members with corresponding assignment. Reset members not specified in + // the assignment to empty assignments. + Map<String, byte[]> assignments = new HashMap<>(); + request.assignments() + .forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + + Set<String> membersWithMissingAssignment = new HashSet<>(); + group.allMembers().forEach(member -> { + byte[] assignment = assignments.get(member.memberId()); + if (assignment != null) { + member.setAssignment(assignment); + } else { + membersWithMissingAssignment.add(member.memberId()); + member.setAssignment(new byte[0]); + } + }); + + if (!membersWithMissingAssignment.isEmpty()) { + log.warn("Setting empty assignments for members {} of {} for generation {}.", + membersWithMissingAssignment, groupId, group.generationId()); + } + + CompletableFuture<Void> appendFuture = new CompletableFuture<>(); + appendFuture.whenComplete((__, t) -> { + // Another member may have joined the group while we were awaiting this callback, + // so we must ensure we are still in the CompletingRebalance state and the same generation + // when it gets invoked. if we have transitioned to another state, then do nothing + if (group.isInState(COMPLETING_REBALANCE) && request.generationId() == group.generationId()) { + if (t != null) { + Errors error = Errors.forException(t); + resetAndPropagateAssignmentWithError(group, error); + maybePrepareRebalanceOrCompleteJoin(group, "Error " + error + " when storing group assignment" + + "during SyncGroup (member: " + memberId + ")."); + } else { + // Members' assignments were already updated. Propagate and transition to Stable. + propagateAssignment(group, Errors.NONE); + group.transitionTo(STABLE); + } + } + }); + + List<Record> records = Collections.singletonList( + RecordHelpers.newGroupMetadataRecord(group, metadataImage.features().metadataVersion()) + ); + return new CoordinatorResult<>(records, appendFuture); + } + + } else if (group.isInState(STABLE)) { + removePendingSyncMember(group, memberId); + + // If the group is stable, we just return the current assignment + GenericGroupMember member = group.member(memberId); + responseFuture.complete(new SyncGroupResponseData() + .setProtocolType(group.protocolType().orElse(null)) + .setProtocolName(group.protocolName().orElse(null)) + .setAssignment(member.assignment()) + .setErrorCode(Errors.NONE.code())); + + } else if (group.isInState(DEAD)) { + throw new IllegalStateException("Reached unexpected condition for Dead group " + groupId); + } + return EMPTY_RESULT; } + private Optional<Errors> validateSyncGroup( + GenericGroup group, + SyncGroupRequestData request + ) { + if (group.isInState(DEAD)) { + // If the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the member retry + // finding the correct coordinator and rejoin. + return Optional.of(COORDINATOR_NOT_AVAILABLE); + } else { + Optional<Errors> memberError = validateExistingMember( + group, + request.memberId(), + request.groupInstanceId(), + "sync-group" + ); + if (memberError.isPresent()) { + return memberError; + } else { + if (request.generationId() != group.generationId()) { + return Optional.of(Errors.ILLEGAL_GENERATION); + + } else if (request.protocolType() != null && + group.protocolType().isPresent() && + !group.protocolType().get().equals(request.protocolType()) + ) { + return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL); + + } else if (request.protocolName() != null && + group.protocolName().isPresent() && + !group.protocolName().get().equals(request.protocolName()) + ) { + return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL); + + } else { + return Optional.empty(); + } + } + } + } + + private void removePendingSyncMember( + GenericGroup group, + String memberId + ) { + group.removePendingSyncMember(memberId); + + String syncKey = syncKey(group.groupId()); + + if (group.generationId() != group.pendingSyncGenerationId()) { + timer.cancel(syncKey); + } else { + if (group.isInState(DEAD) || Review Comment: Have you considered using a switch? That would be cleaner. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); + + } else if (group.isInState(EMPTY)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + + } else if (group.isInState(PREPARING_REBALANCE)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + + } else if (group.isInState(COMPLETING_REBALANCE)) { + group.member(memberId).setAwaitingSyncFuture(responseFuture); + removePendingSyncMember(group, request.memberId()); + + // If this is the leader, then we can attempt to persist state and transition to stable + if (group.isLeader(memberId)) { + log.info("Assignment received from leader {} for group {} for generation {}. " + + "The group has {} members, {} of which are static.", + memberId, groupId, group.generationId(), + group.size(), group.allStaticMemberIds().size()); + + // Fill all members with corresponding assignment. Reset members not specified in + // the assignment to empty assignments. + Map<String, byte[]> assignments = new HashMap<>(); + request.assignments() + .forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + + Set<String> membersWithMissingAssignment = new HashSet<>(); + group.allMembers().forEach(member -> { + byte[] assignment = assignments.get(member.memberId()); + if (assignment != null) { + member.setAssignment(assignment); + } else { + membersWithMissingAssignment.add(member.memberId()); + member.setAssignment(new byte[0]); + } + }); + + if (!membersWithMissingAssignment.isEmpty()) { + log.warn("Setting empty assignments for members {} of {} for generation {}.", + membersWithMissingAssignment, groupId, group.generationId()); + } + + CompletableFuture<Void> appendFuture = new CompletableFuture<>(); + appendFuture.whenComplete((__, t) -> { + // Another member may have joined the group while we were awaiting this callback, + // so we must ensure we are still in the CompletingRebalance state and the same generation + // when it gets invoked. if we have transitioned to another state, then do nothing + if (group.isInState(COMPLETING_REBALANCE) && request.generationId() == group.generationId()) { + if (t != null) { + Errors error = Errors.forException(t); + resetAndPropagateAssignmentWithError(group, error); + maybePrepareRebalanceOrCompleteJoin(group, "Error " + error + " when storing group assignment" + + "during SyncGroup (member: " + memberId + ")."); + } else { + // Members' assignments were already updated. Propagate and transition to Stable. + propagateAssignment(group, Errors.NONE); + group.transitionTo(STABLE); + } + } + }); + + List<Record> records = Collections.singletonList( + RecordHelpers.newGroupMetadataRecord(group, metadataImage.features().metadataVersion()) + ); + return new CoordinatorResult<>(records, appendFuture); + } + + } else if (group.isInState(STABLE)) { + removePendingSyncMember(group, memberId); + + // If the group is stable, we just return the current assignment + GenericGroupMember member = group.member(memberId); + responseFuture.complete(new SyncGroupResponseData() + .setProtocolType(group.protocolType().orElse(null)) + .setProtocolName(group.protocolName().orElse(null)) + .setAssignment(member.assignment()) + .setErrorCode(Errors.NONE.code())); + + } else if (group.isInState(DEAD)) { + throw new IllegalStateException("Reached unexpected condition for Dead group " + groupId); + } + return EMPTY_RESULT; } + private Optional<Errors> validateSyncGroup( + GenericGroup group, + SyncGroupRequestData request + ) { + if (group.isInState(DEAD)) { + // If the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the member retry + // finding the correct coordinator and rejoin. + return Optional.of(COORDINATOR_NOT_AVAILABLE); + } else { + Optional<Errors> memberError = validateExistingMember( + group, + request.memberId(), + request.groupInstanceId(), + "sync-group" + ); + if (memberError.isPresent()) { + return memberError; + } else { + if (request.generationId() != group.generationId()) { + return Optional.of(Errors.ILLEGAL_GENERATION); + + } else if (request.protocolType() != null && + group.protocolType().isPresent() && + !group.protocolType().get().equals(request.protocolType()) + ) { + return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL); + + } else if (request.protocolName() != null && + group.protocolName().isPresent() && + !group.protocolName().get().equals(request.protocolName()) + ) { + return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL); + + } else { + return Optional.empty(); + } + } + } + } + + private void removePendingSyncMember( + GenericGroup group, + String memberId + ) { + group.removePendingSyncMember(memberId); + + String syncKey = syncKey(group.groupId()); + + if (group.generationId() != group.pendingSyncGenerationId()) { Review Comment: Is this required here? I think that this is required when the timeout expires because we want to ensure that the stale timer has no impact. However it does not seem to be required here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record> updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } + return maybeCompleteJoinPhase(group); + } + + public CoordinatorResult<Void, Record> genericGroupSync( + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); + Optional<Errors> errorOpt = validateSyncGroup(group, request); + if (errorOpt.isPresent()) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(errorOpt.get().code())); + + } else if (group.isInState(EMPTY)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + + } else if (group.isInState(PREPARING_REBALANCE)) { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + + } else if (group.isInState(COMPLETING_REBALANCE)) { + group.member(memberId).setAwaitingSyncFuture(responseFuture); + removePendingSyncMember(group, request.memberId()); + + // If this is the leader, then we can attempt to persist state and transition to stable + if (group.isLeader(memberId)) { + log.info("Assignment received from leader {} for group {} for generation {}. " + + "The group has {} members, {} of which are static.", + memberId, groupId, group.generationId(), + group.size(), group.allStaticMemberIds().size()); + + // Fill all members with corresponding assignment. Reset members not specified in + // the assignment to empty assignments. + Map<String, byte[]> assignments = new HashMap<>(); + request.assignments() + .forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); Review Comment: nit: We would usually write it as follow: ``` request.assignments().forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment()) ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org