dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1192184441
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -0,0 +1,865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a response and records to + * mutate the hard state. Those records will be written by the runtime and applied to the + * hard state via the replay methods. + * 2) The replay methods which apply records to the hard state. Those are used in the request + * handling as well as during the initial loading of the records from the partitions. + */ +public class GroupMetadataManager { + + public static class Builder { + private LogContext logContext = null; + private SnapshotRegistry snapshotRegistry = null; + private List<PartitionAssignor> assignors = null; + private TopicsImage topicsImage = null; + private int consumerGroupMaxSize = Integer.MAX_VALUE; + private int consumerGroupHeartbeatIntervalMs = 5000; + + Builder withLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + Builder withAssignors(List<PartitionAssignor> assignors) { + this.assignors = assignors; + return this; + } + + Builder withConsumerGroupMaxSize(int consumerGroupMaxSize) { + this.consumerGroupMaxSize = consumerGroupMaxSize; + return this; + } + + Builder withConsumerGroupHeartbeatInterval(int consumerGroupHeartbeatIntervalMs) { + this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs; + return this; + } + + Builder withTopicsImage(TopicsImage topicsImage) { + this.topicsImage = topicsImage; + return this; + } + + GroupMetadataManager build() { + if (logContext == null) logContext = new LogContext(); + if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); + if (topicsImage == null) topicsImage = TopicsImage.EMPTY; + + if (assignors == null || assignors.isEmpty()) { + throw new IllegalStateException("Assignors must be set before building."); + } + + return new GroupMetadataManager( + snapshotRegistry, + logContext, + assignors, + topicsImage, + consumerGroupMaxSize, + consumerGroupHeartbeatIntervalMs + ); + } + } + + /** + * The logger. + */ + private final Logger log; + + /** + * The snapshot registry. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The list of supported assignors. + */ + private final Map<String, PartitionAssignor> assignors; + + /** + * The default assignor used. + */ + private final PartitionAssignor defaultAssignor; + + /** + * The generic and consumer groups keyed by their name. + */ + private final TimelineHashMap<String, Group> groups; + + /** + * The maximum number of members allowed in a single consumer group. + */ + private final int consumerGroupMaxSize; + + /** + * The heartbeat interval for consumer groups. + */ + private final int consumerGroupHeartbeatIntervalMs; + + /** + * The topics metadata (or image). + */ + private TopicsImage topicsImage; + + private GroupMetadataManager( + SnapshotRegistry snapshotRegistry, + LogContext logContext, + List<PartitionAssignor> assignors, + TopicsImage topicsImage, + int consumerGroupMaxSize, + int consumerGroupHeartbeatIntervalMs + ) { + this.log = logContext.logger(GroupMetadataManager.class); + this.snapshotRegistry = snapshotRegistry; + this.topicsImage = topicsImage; + this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity())); + this.defaultAssignor = assignors.get(0); + this.groups = new TimelineHashMap<>(snapshotRegistry, 0); + this.consumerGroupMaxSize = consumerGroupMaxSize; + this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs; + } + + /** + * Gets or maybe creates a consumer group. + * + * @param groupId The group id. + * @param createIfNotExists A boolean indicating whether the group should be + * created if it does not exist. + * + * @return A ConsumerGroup. + * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or + * if the group is not a consumer group. + */ + // Package private for testing. + ConsumerGroup getOrMaybeCreateConsumerGroup( + String groupId, + boolean createIfNotExists + ) throws GroupIdNotFoundException { + Group group = groups.get(groupId); + + if (group == null && !createIfNotExists) { + throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId)); + } + + if (group == null) { + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId); + groups.put(groupId, consumerGroup); + return consumerGroup; + } else { + if (group.type() == Group.GroupType.CONSUMER) { + return (ConsumerGroup) group; + } else { + // We don't support upgrading/downgrading between protocols at the moment so + // we throw an exception if a group exists with the wrong type. + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId)); + } + } + } + + /** + * Removes the group. + * + * @param groupId The group id. + */ + private void removeGroup( + String groupId + ) { + groups.remove(groupId); + } + + /** + * Validates the request. + * + * @param request The request to validate. + * + * @throws InvalidRequestException if the request is not valid. + * @throws UnsupportedAssignorException if the assignor is not supported. + */ + private void throwIfConsumerGroupHeartbeatRequestIsInvalid( + ConsumerGroupHeartbeatRequestData request + ) throws InvalidRequestException, UnsupportedAssignorException { + if (request.groupId().isEmpty()) { + throw new InvalidRequestException("GroupId can't be empty."); + } + + if (request.memberEpoch() > 0 || request.memberEpoch() == -1) { + if (request.memberId().isEmpty()) { + throw new InvalidRequestException("MemberId can't be empty."); + } + if (request.instanceId() != null) { + throw new InvalidRequestException("InstanceId should only be provided in first request."); + } + if (request.rackId() != null) { + throw new InvalidRequestException("RackId should only be provided in first request."); + } + } else if (request.memberEpoch() == 0) { + if (request.rebalanceTimeoutMs() == -1) { + throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request."); + } + if (request.topicPartitions() == null || !request.topicPartitions().isEmpty()) { + throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining."); + } + if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) { + throw new InvalidRequestException("SubscribedTopicNames must be set in first request."); + } + if (request.serverAssignor() != null && !assignors.containsKey(request.serverAssignor())) { Review Comment: Good point. It should be checked there as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org