dajac commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580920772
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1288,25 +1396,10 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)) .setClientId(clientId) .setClientHost(clientHost) + .setClassicMemberMetadata(null) .build(); - boolean bumpGroupEpoch = false; - if (!updatedMember.equals(member)) { - records.add(newMemberSubscriptionRecord(groupId, updatedMember)); - - if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) { - log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.", - groupId, memberId, updatedMember.subscribedTopicNames()); - bumpGroupEpoch = true; - } - - if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { - log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.", - groupId, memberId, updatedMember.subscribedTopicRegex()); - bumpGroupEpoch = true; - } - } - + boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, member, updatedMember, records); Review Comment: nit: The name does not really represent the intent here. How about `hasMemberChanged` or something along those lines? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1122,4 +1122,27 @@ private <IN, OUT> OUT handleOperationException( return handler.apply(apiError.error(), apiError.message()); } } + + /** + * Creates the JoinGroupResponseData according to the error type. + * + * @param memberId The member id. + * @param error The error. + * @return The JoinGroupResponseData. + */ + private static JoinGroupResponseData createJoinGroupResponseData( + String memberId, + Errors error + ) { + switch (error) { + case MEMBER_ID_REQUIRED: + case INVALID_SESSION_TIMEOUT: + return new JoinGroupResponseData() + .setMemberId(memberId) Review Comment: I actually wonder why we set the member id in the `INVALID_SESSION_TIMEOUT` case. Looking at the client code, we don't use it in the java client. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1092,6 +1097,86 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri } } + /** + * Validates if the received classic member protocols are supported by the group. + * + * @param group The ConsumerGroup. + * @param memberId The joining member id. + * @param protocolType The joining member protocol type. + * @param protocols The joining member protocol collection. + */ + private void throwIfClassicProtocolIsNotSupported( + ConsumerGroup group, + String memberId, + String protocolType, + JoinGroupRequestProtocolCollection protocols + ) { + if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) { + throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + memberId + "'s protocols are not supported."); + } + } + + /** + * Deserialize the subscription in JoinGroupRequestProtocolCollection. + * All the protocols have the same subscription, so the method picks a random one. + * + * @param protocols The JoinGroupRequestProtocolCollection. + * @return The Subscription. + */ + private static ConsumerPartitionAssignor.Subscription deserializeSubscription( + JoinGroupRequestProtocolCollection protocols + ) { + try { + return ConsumerProtocol.deserializeSubscription( + ByteBuffer.wrap(protocols.stream().findAny().get().metadata()) + ); + } catch (SchemaException e) { + throw new IllegalStateException("Malformed embedded consumer protocol."); + } + } + + /** + * Validates the generation id and returns the owned partitions in the JoinGroupRequest to a consumer group. + * + * @param member The joining member. + * @param subscription The Subscription. + * @return The owned partitions if valid, otherwise return null. + */ + private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> validateGenerationIdAndGetOwnedPartition( + ConsumerGroupMember member, + ConsumerPartitionAssignor.Subscription subscription + ) { + List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedPartitions = + toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()); + if (subscription.generationId().isPresent() && subscription.generationId().get() == member.memberEpoch()) { + return ownedPartitions; + } else { + // If the generation id is not provided or doesn't match the member epoch, it's still safe to + // accept the ownedPartitions that is a subset of the assigned partition. Otherwise, set the + // ownedPartition to be null. When a new assignment is provided, the consumer will stop fetching + // from and revoke the partitions it does not own. + if (isSubset(ownedPartitions, member.assignedPartitions())) { + return ownedPartitions; + } else { + return null; + } + } + } + + /** + * @return The ConsumerGroupHeartbeatRequestData.TopicPartitions list converted from the TopicPartitions list. + */ + private static List<ConsumerGroupHeartbeatRequestData.TopicPartitions> toTopicPartitions( + List<TopicPartition> partitions, + TopicsImage topicsImage + ) { + return ConsumerGroup.topicPartitionMapFromList(partitions, topicsImage).entrySet().stream().map( Review Comment: You could do something like this: ``` Map<Uuid, ConsumerGroupHeartbeatRequestData.TopicPartitions> topicPartitionMap = new HashMap<>(); partitions.forEach(topicPartition -> { TopicImage topicImage = topicsImage.getTopic(topicPartition.topic()); if (topicImage != null) { topicPartitionMap .computeIfAbsent(topicImage.id(), __ -> new ConsumerGroupHeartbeatRequestData.TopicPartitions().setTopic(...) .partitions() .add(topicPartition.partition()); } } ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1092,6 +1097,86 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri } } + /** + * Validates if the received classic member protocols are supported by the group. + * + * @param group The ConsumerGroup. + * @param memberId The joining member id. + * @param protocolType The joining member protocol type. + * @param protocols The joining member protocol collection. + */ + private void throwIfClassicProtocolIsNotSupported( + ConsumerGroup group, + String memberId, + String protocolType, + JoinGroupRequestProtocolCollection protocols + ) { + if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) { + throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + memberId + "'s protocols are not supported."); + } + } + + /** + * Deserialize the subscription in JoinGroupRequestProtocolCollection. + * All the protocols have the same subscription, so the method picks a random one. + * + * @param protocols The JoinGroupRequestProtocolCollection. + * @return The Subscription. + */ + private static ConsumerPartitionAssignor.Subscription deserializeSubscription( + JoinGroupRequestProtocolCollection protocols + ) { + try { + return ConsumerProtocol.deserializeSubscription( + ByteBuffer.wrap(protocols.stream().findAny().get().metadata()) + ); + } catch (SchemaException e) { + throw new IllegalStateException("Malformed embedded consumer protocol."); + } + } + + /** + * Validates the generation id and returns the owned partitions in the JoinGroupRequest to a consumer group. + * + * @param member The joining member. + * @param subscription The Subscription. + * @return The owned partitions if valid, otherwise return null. + */ + private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> validateGenerationIdAndGetOwnedPartition( Review Comment: We definitely need it on the sync path. Coming back to my initial point, would it work if we just pass the received owned partitions to the assignment reconciler? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1336,6 +1422,233 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + /** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + private CoordinatorResult<Void, Record> consumerGroupJoin( + ConsumerGroup group, + RequestContext context, + JoinGroupRequestData request, + CompletableFuture<JoinGroupResponseData> responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<Record> records = new ArrayList<>(); + final String groupId = request.groupId(); + String memberId = request.memberId(); + final String instanceId = request.groupInstanceId(); + final JoinGroupRequestProtocolCollection protocols = request.protocols(); + + final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); + List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions = null; + + if (!validateClassicGroupSessionTimeout(memberId, request.sessionTimeoutMs(), responseFuture)) { + return EMPTY_RESULT; + } + throwIfConsumerGroupIsFull(group, memberId); + // TODO: need to throw an exception if group is dead? + + // Get or create the member. + if (isUnknownMember) memberId = Uuid.randomUuid().toString(); + ConsumerGroupMember member; + ConsumerGroupMember.Builder updatedMemberBuilder; + boolean staticMemberReplaced = false; + boolean newMemberCreated = false; + if (instanceId == null) { + if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { + // If member id required, send back a response to call for another join group request with allocated member id. + log.info("Dynamic member with unknown member id joins group {}. " + + "Created a new member id {} and requesting the member to rejoin with this id.", + group.groupId(), memberId); + + responseFuture.complete(new JoinGroupResponseData() + .setMemberId(memberId) + .setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) + ); + return EMPTY_RESULT; + } else { + // A dynamic member (re-)joins. + throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); + newMemberCreated = !group.hasMember(memberId); + + member = group.getOrMaybeCreateMember(memberId, true); + if (!newMemberCreated) ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); + log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + } + } else { + throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); + member = group.staticMember(instanceId); + // A new static member joins or the existing static member rejoins. + if (isUnknownMember) { + newMemberCreated = true; + if (member == null) { + // New static member. + member = group.getOrMaybeCreateMember(memberId, true); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); + } else { + // Replace the current static member. + staticMemberReplaced = true; + updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) + .setAssignedPartitions(member.assignedPartitions()); + ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); + removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); + log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + } + } else { + // Rejoining static member. Fence the static group with unmatched member id. + throwIfStaticMemberIsUnknown(member, instanceId); + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + + ownedTopicPartitions = validateGenerationIdAndGetOwnedPartition(member, subscription); + updatedMemberBuilder = new ConsumerGroupMember.Builder(member); + log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); + } + } + + int groupEpoch = group.groupEpoch(); + Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata(); + + // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue + // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have + // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue + // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have + // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition. + ConsumerGroupMember updatedMember = updatedMemberBuilder + .maybeUpdateInstanceId(Optional.ofNullable(instanceId)) + .maybeUpdateRackId(subscription.rackId()) + .maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs())) + .maybeUpdateServerAssignorName(Optional.empty()) + .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())) + .setClientId(context.clientId()) + .setClientHost(context.clientAddress.toString()) + .setSupportedClassicProtocols(protocols) + .build(); + + boolean bumpGroupEpoch = false; + if (!updatedMember.equals(member)) { + records.add(newMemberSubscriptionRecord(groupId, updatedMember)); + + if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) { + log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.", + groupId, memberId, updatedMember.subscribedTopicNames()); + bumpGroupEpoch = true; + } + + if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { + log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.", + groupId, memberId, updatedMember.subscribedTopicRegex()); + bumpGroupEpoch = true; + } + } + + if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + // The subscription metadata is updated in two cases: + // 1) The member has updated its subscriptions; + // 2) The refresh deadline has been reached. + subscriptionMetadata = group.computeSubscriptionMetadata( + member, + updatedMember, + metadataImage.topics(), + metadataImage.cluster() + ); + + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + groupId, subscriptionMetadata); + bumpGroupEpoch = true; + records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); + } + + if (bumpGroupEpoch) { + groupEpoch += 1; + records.add(newGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + } + + group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); + } + + // 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. + int targetAssignmentEpoch = group.assignmentEpoch(); + Assignment targetAssignment = group.targetAssignment(memberId); + if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { + String preferredServerAssignor = group.computePreferredServerAssignor( + member, + updatedMember + ).orElse(defaultAssignor.name()); + try { + TargetAssignmentBuilder assignmentResultBuilder = + new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) + .withMembers(group.members()) + .withStaticMembers(group.staticMembers()) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignment(group.targetAssignment()) + .addOrUpdateMember(memberId, updatedMember); + TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; + // A new static member is replacing an older one with the same subscriptions. + // We just need to remove the older member and add the newer one. The new member should + // reuse the target assignment of the older member. + if (staticMemberReplaced) { + assignmentResult = assignmentResultBuilder + .removeMember(member.memberId()) + .build(); + } else { + assignmentResult = assignmentResultBuilder + .build(); + } + + log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.", + groupId, groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment()); + + records.addAll(assignmentResult.records()); + targetAssignment = assignmentResult.targetAssignment().get(memberId); + targetAssignmentEpoch = groupEpoch; + } catch (PartitionAssignorException ex) { + String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", + groupEpoch, ex.getMessage()); + log.error("[GroupId {}] {}.", groupId, msg); + throw new UnknownServerException(msg, ex); + } + } + + // 3. Reconcile the member's assignment with the target assignment if the member is not + // fully reconciled yet. + updatedMember = maybeReconcile( + groupId, + updatedMember, + group::currentPartitionEpoch, + targetAssignmentEpoch, + targetAssignment, + ownedTopicPartitions, + records + ); + + if (newMemberCreated) { + scheduleConsumerGroupSessionTimeout(groupId, memberId); + } + + // TODO: what will happen to the client if the generation id in response is 0? + responseFuture.complete(new JoinGroupResponseData() + .setMemberId(updatedMember.memberId()) + .setGenerationId(updatedMember.memberEpoch()) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setProtocolName(protocols.stream().findAny().get().name()) + ); Review Comment: Yeah, I understand this but I don't like it. I think that we can keep it as it is in order to minimize the changes in this PR. We can save this for later. That being said, my initial comment still stands. You cannot complete the future immediately here. You need to use a "write future" and complete the "response future" only when the write is completed. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1122,4 +1122,27 @@ private <IN, OUT> OUT handleOperationException( return handler.apply(apiError.error(), apiError.message()); } } + + /** + * Creates the JoinGroupResponseData according to the error type. + * + * @param memberId The member id. + * @param error The error. + * @return The JoinGroupResponseData. + */ + private static JoinGroupResponseData createJoinGroupResponseData( + String memberId, + Errors error + ) { + switch (error) { + case MEMBER_ID_REQUIRED: Review Comment: Does it really work? The member id in the request will be empty in this case as we generate it on the server and return it. I think that we should keep it as it was before. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1413,6 +1506,243 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr return new CoordinatorResult<>(records, response); } + /** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param context The request context. + * @param request The actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ + private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup( + ConsumerGroup group, + RequestContext context, + JoinGroupRequestData request, + CompletableFuture<JoinGroupResponseData> responseFuture + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<Record> records = new ArrayList<>(); + final String groupId = request.groupId(); + String memberId = request.memberId(); + final String instanceId = request.groupInstanceId(); + final JoinGroupRequestProtocolCollection protocols = request.protocols(); + final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + + throwIfConsumerGroupIsFull(group, memberId); + throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); + // TODO: need to throw an exception if group is dead? Review Comment: Could we really end up with a dead group here? If the group is empty, we should have taken the regular classic path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org