AndrewJSchofield commented on code in PR #16516:
URL: https://github.com/apache/kafka/pull/16516#discussion_r1663998177


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern.share;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.coordinator.group.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.CoordinatorRecordHelpers;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.modern.ModernGroup;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A Share Group.
+ */
+public class ShareGroup extends ModernGroup<ShareGroupMember> {
+
+    private static final String PROTOCOL_TYPE = "share";
+
+    public enum ShareGroupState {
+        EMPTY("Empty"),
+        STABLE("Stable"),
+        DEAD("Dead"),
+        UNKNOWN("Unknown");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        ShareGroupState(String name) {
+            this.name = name;
+            this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<ShareGroupState> state;
+
+    public ShareGroup(
+        SnapshotRegistry snapshotRegistry,
+        String groupId
+    ) {
+        super(snapshotRegistry, groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, 
ShareGroupState.EMPTY);
+    }
+
+    /**
+     * @return The group type (Share).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.SHARE;
+    }
+
+    /**
+     * @return The group protocol type (share).
+     */
+    @Override
+    public String protocolType() {
+        return PROTOCOL_TYPE;
+    }
+
+    /**
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * @return The current state as a String with given committedOffset.
+     */
+    public String stateAsString(long committedOffset) {
+        return state.get(committedOffset).toString();
+    }
+
+    /**
+     * @return The current state.
+     */
+    public ShareGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * @return The current state based on committed offset.
+     */
+    public ShareGroupState state(long committedOffset) {
+        return state.get(committedOffset);
+    }
+
+    /**
+     * Gets or creates a member.
+     *
+     * @param memberId          The member id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ShareGroupMember.
+     */
+    public ShareGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        ShareGroupMember member = members.get(memberId);
+        if (member != null) return member;
+
+        if (!createIfNotExists) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s is not a member of group %s.", 
memberId, groupId));
+        }
+
+        return new ShareGroupMember.Builder(memberId).build();
+    }
+
+    /**
+     * Updates the member.
+     *
+     * @param newMember The new share group member.
+     */
+    @Override
+    public void updateMember(ShareGroupMember newMember) {
+        if (newMember == null) {
+            throw new IllegalArgumentException("newMember cannot be null.");
+        }
+
+        ShareGroupMember oldMember = members.put(newMember.memberId(), 
newMember);
+        maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, 
newMember);
+        maybeUpdatePartitionEpoch(oldMember, newMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member id to remove.
+     */
+    public void removeMember(String memberId) {
+        ShareGroupMember oldMember = members.remove(memberId);
+        maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, 
null);
+        maybeRemovePartitionEpoch(oldMember);
+        maybeUpdateGroupState();
+    }
+
+    @Override
+    public void validateOffsetCommit(
+        String memberId,
+        String groupInstanceId,
+        int memberEpoch,
+        boolean isTransactional,
+        short apiVersion
+    ) {
+        throw new UnsupportedOperationException("validateOffsetCommit is not 
supported for Share Groups.");
+    }
+
+    @Override
+    public void validateOffsetFetch(
+        String memberId,
+        int memberEpoch,
+        long lastCommittedOffset
+    ) {
+        throw new UnsupportedOperationException("validateOffsetFetch is not 
supported for Share Groups.");
+    }
+
+    @Override
+    public void validateOffsetDelete() {
+        throw new UnsupportedOperationException("validateOffsetDelete is not 
supported for Share Groups.");
+    }
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    @Override
+    public void validateDeleteGroup() throws ApiException {
+        if (state() != ShareGroupState.EMPTY) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+    }
+
+    /**
+     * Populates the list of records with tombstone(s) for deleting the group.
+     *
+     * @param records The list of records.
+     */
+    @Override
+    public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
+        
records.add(CoordinatorRecordHelpers.newGroupEpochTombstoneRecord(groupId()));

Review Comment:
   This should actually be created a `ShareGroupMetadata` tombstone.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern.share;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.coordinator.group.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.CoordinatorRecordHelpers;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.modern.ModernGroup;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A Share Group.
+ */
+public class ShareGroup extends ModernGroup<ShareGroupMember> {
+
+    private static final String PROTOCOL_TYPE = "share";
+
+    public enum ShareGroupState {
+        EMPTY("Empty"),
+        STABLE("Stable"),
+        DEAD("Dead"),
+        UNKNOWN("Unknown");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        ShareGroupState(String name) {
+            this.name = name;
+            this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<ShareGroupState> state;
+
+    public ShareGroup(
+        SnapshotRegistry snapshotRegistry,
+        String groupId
+    ) {
+        super(snapshotRegistry, groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, 
ShareGroupState.EMPTY);
+    }
+
+    /**
+     * @return The group type (Share).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.SHARE;
+    }
+
+    /**
+     * @return The group protocol type (share).
+     */
+    @Override
+    public String protocolType() {
+        return PROTOCOL_TYPE;
+    }
+
+    /**
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * @return The current state as a String with given committedOffset.
+     */
+    public String stateAsString(long committedOffset) {
+        return state.get(committedOffset).toString();
+    }
+
+    /**
+     * @return The current state.
+     */
+    public ShareGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * @return The current state based on committed offset.
+     */
+    public ShareGroupState state(long committedOffset) {
+        return state.get(committedOffset);
+    }
+
+    /**
+     * Gets or creates a member.
+     *
+     * @param memberId          The member id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ShareGroupMember.
+     */
+    public ShareGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        ShareGroupMember member = members.get(memberId);
+        if (member != null) return member;
+
+        if (!createIfNotExists) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s is not a member of group %s.", 
memberId, groupId));
+        }
+
+        return new ShareGroupMember.Builder(memberId).build();
+    }
+
+    /**
+     * Updates the member.
+     *
+     * @param newMember The new share group member.
+     */
+    @Override
+    public void updateMember(ShareGroupMember newMember) {
+        if (newMember == null) {
+            throw new IllegalArgumentException("newMember cannot be null.");
+        }
+
+        ShareGroupMember oldMember = members.put(newMember.memberId(), 
newMember);
+        maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, 
newMember);
+        maybeUpdatePartitionEpoch(oldMember, newMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member id to remove.
+     */
+    public void removeMember(String memberId) {
+        ShareGroupMember oldMember = members.remove(memberId);
+        maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, 
null);
+        maybeRemovePartitionEpoch(oldMember);
+        maybeUpdateGroupState();
+    }
+
+    @Override
+    public void validateOffsetCommit(
+        String memberId,
+        String groupInstanceId,
+        int memberEpoch,
+        boolean isTransactional,
+        short apiVersion
+    ) {
+        throw new UnsupportedOperationException("validateOffsetCommit is not 
supported for Share Groups.");
+    }
+
+    @Override
+    public void validateOffsetFetch(
+        String memberId,
+        int memberEpoch,
+        long lastCommittedOffset
+    ) {
+        throw new UnsupportedOperationException("validateOffsetFetch is not 
supported for Share Groups.");
+    }
+
+    @Override
+    public void validateOffsetDelete() {
+        throw new UnsupportedOperationException("validateOffsetDelete is not 
supported for Share Groups.");
+    }
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    @Override
+    public void validateDeleteGroup() throws ApiException {
+        if (state() != ShareGroupState.EMPTY) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+    }
+
+    /**
+     * Populates the list of records with tombstone(s) for deleting the group.
+     *
+     * @param records The list of records.
+     */
+    @Override
+    public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
+        
records.add(CoordinatorRecordHelpers.newGroupEpochTombstoneRecord(groupId()));

Review Comment:
   And also, the GroupMetadataManager should be writing `ShareGroupMetadata` 
records in `shareGroupHeartbeat`, when we get to that.



##########
group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataKey.json:
##########
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// KIP-932 is in development. This schema is subject to 
non-backwards-compatible changes.
+{
+  "type": "data",
+  "name": "ShareGroupMemberMetadataKey",
+  "validVersions": "10",

Review Comment:
   I would expect `ShareGroupMetadataKey` and `ShareGroupMetadataValue` in the 
KIP to be defined at the same time. I'd prefer not to leave gaps in the 
versions, so if we are introducing a subset of the KIP's record schemas, I 
suggest renumbering them in the KIP so we get a continuous sequence once the PR 
is merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to