[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-27 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275938400


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275472419


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275411706


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275376886


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -497,6 +499,30 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
 return metadataRefreshDeadline;
 }
 
+/**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param memberEpoch   The member epoch.
+ */
+@Override
+public void validateOffsetCommit(
+String memberId,
+String groupInstanceId,
+int memberEpoch
+) throws UnknownMemberIdException, StaleMemberEpochException {
+// When the member epoch is -1, the request comes from either the 
admin client
+// or a consumer which does not use the group management facility. In 
this case,
+// the request can commit offsets if the group is empty.
+if (memberEpoch < 0 && members().isEmpty()) return;

Review Comment:
   Yeah, that's right :).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275376592


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -537,6 +554,55 @@ public void 
testGenericGroupOffsetCommitWithRetentionTime() {
 );
 }
 
+@Test
+public void testGenericGroupOffsetCommitMaintainsSession() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+// Create a group.
+GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+"foo",
+true
+);
+
+// Add member.
+GenericGroupMember member = mkGenericMember("member", 
Optional.empty());
+group.add(member);
+
+// Transition to next generation.
+group.transitionTo(GenericGroupState.PREPARING_REBALANCE);
+group.initNextGeneration();
+assertEquals(1, group.generationId());
+group.transitionTo(GenericGroupState.STABLE);
+
+// Schedule session timeout. This would be normally done when
+// the group transitions to stable.
+
context.groupMetadataManager.rescheduleGenericGroupMemberHeartbeat(group, 
member);
+
+// Advance time by half of the session timeout.
+assertEquals(Collections.emptyList(), context.sleep(5000 / 2));
+
+// Commit.
+context.commitOffset(
+new OffsetCommitRequestData()
+.setGroupId("foo")
+.setMemberId("member")
+.setGenerationIdOrMemberEpoch(1)
+.setRetentionTimeMs(1234L)
+.setTopics(Collections.singletonList(
+new OffsetCommitRequestData.OffsetCommitRequestTopic()
+.setName("bar")
+.setPartitions(Collections.singletonList(
+new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+.setPartitionIndex(0)
+.setCommittedOffset(100L)
+))
+))
+);
+
+// Advance time by half of the session timeout.
+assertEquals(Collections.emptyList(), context.sleep(5000 / 2));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275371089


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+if (time == null) time = Time.SYSTEM;
+
+if (groupMetadataManager == null) {
+throw new IllegalArgumentException("GroupMet

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275366606


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -738,6 +742,84 @@ public String generateMemberId(String clientId, 
Optional groupInstanceId
 .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + 
UUID.randomUUID());
 }
 
+/**
+ * Validates that (1) the group instance id exists and is mapped to the 
member id
+ * if the group instance id is provided; and (2) the member id exists in 
the group.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param operation The operation.
+ *
+ * @throws UnknownMemberIdException
+ * @throws FencedInstanceIdException
+ */
+public void validateMember(
+String memberId,
+String groupInstanceId,
+String operation
+) throws UnknownMemberIdException, FencedInstanceIdException {
+if (groupInstanceId != null) {
+String existingMemberId = staticMemberId(groupInstanceId);
+if (existingMemberId == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+} else if (!existingMemberId.equals(memberId)) {
+log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+ "is fenced by existing memberId={} during operation 
{}",
+memberId, groupInstanceId, existingMemberId, operation);
+throw Errors.FENCED_INSTANCE_ID.exception();
+}
+}
+
+if (!hasMemberId(memberId)) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+}
+}
+
+/**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param generationId  The generation id.
+ */
+@Override
+public void validateOffsetCommit(
+String memberId,
+String groupInstanceId,
+int generationId
+) throws CoordinatorNotAvailableException, UnknownMemberIdException, 
IllegalGenerationException, FencedInstanceIdException {
+if (isInState(DEAD)) {
+throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
+}
+
+if (generationId < 0 && isInState(EMPTY)) {
+// When the generation id is -1, the request comes from either the 
admin client
+// or a consumer which does not use the group management facility. 
In this case,
+// the request can commit offsets if the group is empty.
+return;
+}
+
+if (generationId >= 0 || !memberId.isEmpty() || groupInstanceId != 
null) {
+validateMember(memberId, groupInstanceId, "offset-commit");

Review Comment:
   We will see when we implement it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275365874


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274888778


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274856976


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274550319


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetMetadataManagerTest {
+static class OffsetMetadataManagerTestContext {
+public static class Builder {
+final private MockTime time = new MockTime();
+final private LogContext logContext = new LogContext();
+final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+private MetadataImage metadataImage = null;
+private int offsetMetadataMaxSize = 4096;
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+OffsetMetadataManagerTestContext build() {
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+
+GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+.withTime(time)
+.withTimer(new MockCoordinatorTimer<>(time))
+.withSnapshotRegistry(snapshotRegistry)
+.withLogContext(logContext)
+.withMetadataImage(metadataImage)
+.wi

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274546743


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetMetadataManagerTest {
+static class OffsetMetadataManagerTestContext {
+public static class Builder {
+final private MockTime time = new MockTime();
+final private LogContext logContext = new LogContext();
+final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+private MetadataImage metadataImage = null;
+private int offsetMetadataMaxSize = 4096;
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+OffsetMetadataManagerTestContext build() {
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+
+GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+.withTime(time)
+.withTimer(new MockCoordinatorTimer<>(time))
+.withSnapshotRegistry(snapshotRegistry)
+.withLogContext(logContext)
+.withMetadataImage(metadataImage)
+.wi

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274547512


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetMetadataManagerTest {
+static class OffsetMetadataManagerTestContext {
+public static class Builder {
+final private MockTime time = new MockTime();
+final private LogContext logContext = new LogContext();
+final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+private MetadataImage metadataImage = null;
+private int offsetMetadataMaxSize = 4096;
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+OffsetMetadataManagerTestContext build() {
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+
+GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+.withTime(time)
+.withTimer(new MockCoordinatorTimer<>(time))
+.withSnapshotRegistry(snapshotRegistry)
+.withLogContext(logContext)
+.withMetadataImage(metadataImage)
+.wi

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274535047


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -738,6 +742,84 @@ public String generateMemberId(String clientId, 
Optional groupInstanceId
 .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + 
UUID.randomUUID());
 }
 
+/**
+ * Validates that (1) the group instance id exists and is mapped to the 
member id
+ * if the group instance id is provided; and (2) the member id exists in 
the group.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param operation The operation.
+ *
+ * @throws UnknownMemberIdException
+ * @throws FencedInstanceIdException
+ */
+public void validateMember(
+String memberId,
+String groupInstanceId,
+String operation
+) throws UnknownMemberIdException, FencedInstanceIdException {
+if (groupInstanceId != null) {
+String existingMemberId = staticMemberId(groupInstanceId);
+if (existingMemberId == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+} else if (!existingMemberId.equals(memberId)) {
+log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+ "is fenced by existing memberId={} during operation 
{}",
+memberId, groupInstanceId, existingMemberId, operation);
+throw Errors.FENCED_INSTANCE_ID.exception();
+}
+}
+
+if (!hasMemberId(memberId)) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+}
+}
+
+/**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param generationId  The generation id.
+ */
+@Override
+public void validateOffsetCommit(
+String memberId,
+String groupInstanceId,
+int generationId
+) throws CoordinatorNotAvailableException, UnknownMemberIdException, 
IllegalGenerationException, FencedInstanceIdException {
+if (isInState(DEAD)) {
+throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
+}
+
+if (generationId < 0 && isInState(EMPTY)) {
+// When the generation id is -1, the request comes from either the 
admin client
+// or a consumer which does not use the group management facility. 
In this case,
+// the request can commit offsets if the group is empty.
+return;
+}
+
+if (generationId >= 0 || !memberId.isEmpty() || groupInstanceId != 
null) {
+validateMember(memberId, groupInstanceId, "offset-commit");

Review Comment:
   Transactional offset commits are done via another API which is not 
implemented yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274534100


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -738,6 +742,84 @@ public String generateMemberId(String clientId, 
Optional groupInstanceId
 .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + 
UUID.randomUUID());
 }
 
+/**
+ * Validates that (1) the group instance id exists and is mapped to the 
member id
+ * if the group instance id is provided; and (2) the member id exists in 
the group.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param operation The operation.
+ *
+ * @throws UnknownMemberIdException
+ * @throws FencedInstanceIdException
+ */
+public void validateMember(

Review Comment:
   Yes because it is used in the GroupMetadataManager as well.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -738,6 +742,84 @@ public String generateMemberId(String clientId, 
Optional groupInstanceId
 .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + 
UUID.randomUUID());
 }
 
+/**
+ * Validates that (1) the group instance id exists and is mapped to the 
member id
+ * if the group instance id is provided; and (2) the member id exists in 
the group.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param operation The operation.
+ *
+ * @throws UnknownMemberIdException
+ * @throws FencedInstanceIdException
+ */
+public void validateMember(

Review Comment:
   Yes because it is used by the GroupMetadataManager as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274533496


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -497,6 +499,30 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
 return metadataRefreshDeadline;
 }
 
+/**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param memberEpoch   The member epoch.
+ */
+@Override
+public void validateOffsetCommit(
+String memberId,
+String groupInstanceId,
+int memberEpoch
+) throws UnknownMemberIdException, StaleMemberEpochException {
+// When the member epoch is -1, the request comes from either the 
admin client
+// or a consumer which does not use the group management facility. In 
this case,
+// the request can commit offsets if the group is empty.
+if (memberEpoch < 0 && members().isEmpty()) return;

Review Comment:
   No. EMPTY is actually set based on this condition: 
https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java#L505.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274531530


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java:
##
@@ -239,6 +283,14 @@ public void replay(Record record) throws RuntimeException {
 ApiMessageAndVersion value = record.value();
 
 switch (key.version()) {
+case 0:

Review Comment:
   Yeah, this is actually correct. Weird but correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274530527


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -114,12 +116,29 @@ public static OffsetAndMetadata fromRecord(
 ) {
 return new OffsetAndMetadata(
 record.offset(),
-record.leaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
-OptionalInt.empty() : OptionalInt.of(record.leaderEpoch()),
+ofSentinel(record.leaderEpoch()),
 record.metadata(),
 record.commitTimestamp(),
-record.expireTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ?
-OptionalLong.empty() : 
OptionalLong.of(record.expireTimestamp())
+ofSentinel(record.expireTimestamp())
+);
+}
+
+/**
+ * @return An OffsetAndMetadata created from an 
OffsetCommitRequestPartition request.
+ */
+public static OffsetAndMetadata fromRequest(
+OffsetCommitRequestData.OffsetCommitRequestPartition partition,
+long currentTimeMs,
+OptionalLong expireTimestampMs
+) {
+return new OffsetAndMetadata(
+partition.committedOffset(),
+ofSentinel(partition.committedLeaderEpoch()),
+partition.committedMetadata() == null ?
+OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
+partition.commitTimestamp() == 
OffsetCommitRequest.DEFAULT_TIMESTAMP ?

Review Comment:
   In the record, we store the decided time (either current time or the 
provided one). We store a sentinel (-1) for the expire timestamp if not set 
though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274528297


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274519871


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274520457


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274517754


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -471,9 +472,48 @@ public CompletableFuture 
commitOffsets(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+if (request.groupId() == null) {
+return 
CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(
+request,
+Errors.INVALID_GROUP_ID
+));
+}
+
+return runtime.scheduleWriteOperation(
+"commit-offset",
+topicPartitionFor(request.groupId()),
+coordinator -> coordinator.commitOffset(context, request)
+).exceptionally(exception -> {

Review Comment:
   Yeah, there are always slightly different. This is why I prefer to be 
explicit in the callback here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274516689


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273942395


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273709366


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273673069


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273672036


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273273570


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273270100


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273269009


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273267151


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273260781


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273258811


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273253421


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273251270


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273249691


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -471,9 +472,48 @@ public CompletableFuture 
commitOffsets(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+if (request.groupId() == null) {
+return 
CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(
+request,
+Errors.INVALID_GROUP_ID
+));
+}

Review Comment:
   Added: 
   ```
   // For backwards compatibility, we support offset commits for the empty 
groupId.
   ```
   
   I think that we can add the same to `DescribeGroups and DeleteGroups` but we 
don't have to mention the here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273247805


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273240983


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1273238804


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -471,9 +472,48 @@ public CompletableFuture 
commitOffsets(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+if (request.groupId() == null) {
+return 
CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(
+request,
+Errors.INVALID_GROUP_ID
+));
+}
+
+return runtime.scheduleWriteOperation(
+"commit-offset",
+topicPartitionFor(request.groupId()),
+coordinator -> coordinator.commitOffset(context, request)
+).exceptionally(exception -> {

Review Comment:
   The mapping is different, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r127323


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-24 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1272116729


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -408,6 +408,31 @@ public MetadataImage image() {
 return metadataImage;
 }
 
+/**
+ *
+ * @param groupId
+ * @param createIfNotExists
+ * @return
+ * @throws GroupIdNotFoundException
+ */
+public Group getOrMaybeCreateSimpleGroup(

Review Comment:
   I have reworked this code. Let me know what you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-24 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1272114680


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-24 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1272031324


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-24 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1272030875


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -408,6 +408,31 @@ public MetadataImage image() {
 return metadataImage;
 }
 
+/**
+ *
+ * @param groupId
+ * @param createIfNotExists
+ * @return
+ * @throws GroupIdNotFoundException
+ */
+public Group getOrMaybeCreateSimpleGroup(

Review Comment:
   Yeah, I agree with you. Let me rework this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-24 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1272030627


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-24 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1272025183


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -471,9 +472,48 @@ public CompletableFuture 
commitOffsets(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+if (request.groupId() == null) {
+return 
CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(
+request,
+Errors.INVALID_GROUP_ID
+));
+}
+
+return runtime.scheduleWriteOperation(
+"commit-offset",
+topicPartitionFor(request.groupId()),
+coordinator -> coordinator.commitOffset(context, request)
+).exceptionally(exception -> {

Review Comment:
   Correct. Note that `NOT_ENOUGH_REPLICAS_AFTER_APPEND` is not in the new code 
because this cannot happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-21 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1270960303


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-21 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1270959863


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM

[GitHub] [kafka] dajac commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-21 Thread via GitHub


dajac commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1270959230


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EM