jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1271176157


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *    mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *    hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *    handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+    public static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private Time time = null;
+        private GroupMetadataManager groupMetadataManager = null;
+        private int offsetMetadataMaxSize = 4096;
+        private MetadataImage metadataImage = null;
+
+        Builder withLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder withTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+            this.groupMetadataManager = groupMetadataManager;
+            return this;
+        }
+
+        Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+            this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+            return this;
+        }
+
+        Builder withMetadataImage(MetadataImage metadataImage) {
+            this.metadataImage = metadataImage;
+            return this;
+        }
+
+        public OffsetMetadataManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+            if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+            if (time == null) time = Time.SYSTEM;
+
+            if (groupMetadataManager == null) {
+                throw new IllegalArgumentException("GroupMetadataManager 
cannot be null");
+            }
+
+            return new OffsetMetadataManager(
+                snapshotRegistry,
+                logContext,
+                time,
+                metadataImage,
+                groupMetadataManager,
+                offsetMetadataMaxSize
+            );
+        }
+    }
+
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The system time.
+     */
+    private final Time time;
+
+    /**
+     * The metadata image.
+     */
+    private MetadataImage metadataImage;
+
+    /**
+     * The group metadata manager.
+     */
+    private final GroupMetadataManager groupMetadataManager;
+
+    /**
+     * The maximum allowed metadata for any offset commit.
+     */
+    private final int offsetMetadataMaxSize;
+
+    /**
+     * The offsets keyed by topic-partition and group id.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<TopicPartition, 
OffsetAndMetadata>> offsetsByGroup;
+
+    OffsetMetadataManager(
+        SnapshotRegistry snapshotRegistry,
+        LogContext logContext,
+        Time time,
+        MetadataImage metadataImage,
+        GroupMetadataManager groupMetadataManager,
+        int offsetMetadataMaxSize
+    ) {
+        this.snapshotRegistry = snapshotRegistry;
+        this.log = logContext.logger(OffsetMetadataManager.class);
+        this.time = time;
+        this.metadataImage = metadataImage;
+        this.groupMetadataManager = groupMetadataManager;
+        this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+        this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
+    }
+
+    /**
+     * Validates an OffsetCommit request.
+     *
+     * @param context The request context.
+     * @param request The actual request.
+     */
+    private void validateOffsetCommit(
+        RequestContext context,
+        OffsetCommitRequestData request
+    ) throws ApiException {
+        Group group;
+        try {
+            // If the group does not exist and generation id is -1, the 
request comes from
+            // either the admin client or a consumer which does not use the 
group management
+            // facility. In this case, a so-called simple group is created and 
the request
+            // is accepted.
+            group = groupMetadataManager.getOrMaybeCreateSimpleGroup(
+                request.groupId(),
+                request.generationIdOrMemberEpoch() < 0
+            );
+        } catch (GroupIdNotFoundException ex) {
+            // Maintain backward compatibility. This is a bit weird in the
+            // context of the new protocol though.
+            throw Errors.ILLEGAL_GENERATION.exception();
+        }
+
+        // Validate the request based on the group type.
+        switch (group.type()) {
+            case GENERIC:
+                validateOffsetCommitForGenericGroup(
+                    (GenericGroup) group,
+                    request
+                );
+                break;
+
+            case CONSUMER:
+                validateOffsetCommitForConsumerGroup(
+                    (ConsumerGroup) group,
+                    context,
+                    request
+                );
+                break;
+        }
+    }
+
+    /**
+     * Validates an OffsetCommit request for a generic group.
+     *
+     * @param group     The generic group.
+     * @param request   The actual request.
+     */
+    public void validateOffsetCommitForGenericGroup(
+        GenericGroup group,
+        OffsetCommitRequestData request
+    ) throws KafkaException {
+        if (group.isInState(DEAD)) {
+            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
+        }
+
+        if (request.generationIdOrMemberEpoch() < 0 && group.isInState(EMPTY)) 
{
+            // When the generation id is -1, the request comes from either the 
admin client
+            // or a consumer which does not use the group management facility. 
In this case,
+            // the request can commit offsets if the group is empty.
+            return;
+        }
+
+        Optional<String> groupInstanceId = 
OffsetCommitRequest.groupInstanceId(request);
+        if (request.generationIdOrMemberEpoch() >= 0 || 
!request.memberId().isEmpty() || groupInstanceId.isPresent()) {
+            // We are validating three things:
+            // 1. If the `groupInstanceId` is present, then it exists and is 
mapped to `memberId`;
+            // 2. The `memberId` exists in the group; and
+            // 3. The `generationId` matches the current generation id.
+            if (groupInstanceId.isPresent()) {
+                String memberId = group.staticMemberId(groupInstanceId.get());
+                if (memberId == null) {
+                    throw Errors.UNKNOWN_MEMBER_ID.exception();
+                } else if (!request.memberId().equals(memberId)) {
+                    throw Errors.FENCED_INSTANCE_ID.exception();
+                }
+            }
+
+            if (!group.hasMemberId(request.memberId())) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception();
+            }
+
+            if (request.generationIdOrMemberEpoch() != group.generationId()) {
+                throw Errors.ILLEGAL_GENERATION.exception();
+            }
+        } else if (!group.isInState(EMPTY)) {

Review Comment:
           // If the request does not contain the member id and the generation
          // id (version 0), offset commits are only accepted when the group
          // is not empty.
   
   Does this mean we should throw the error if the group is empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to