[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275444136


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275352752


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+if (time == null) time = Time.SYSTEM;
+
+if (groupMetadataManager == null) {
+throw new 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275350025


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -537,6 +554,55 @@ public void 
testGenericGroupOffsetCommitWithRetentionTime() {
 );
 }
 
+@Test
+public void testGenericGroupOffsetCommitMaintainsSession() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+// Create a group.
+GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+"foo",
+true
+);
+
+// Add member.
+GenericGroupMember member = mkGenericMember("member", 
Optional.empty());
+group.add(member);
+
+// Transition to next generation.
+group.transitionTo(GenericGroupState.PREPARING_REBALANCE);
+group.initNextGeneration();
+assertEquals(1, group.generationId());
+group.transitionTo(GenericGroupState.STABLE);
+
+// Schedule session timeout. This would be normally done when
+// the group transitions to stable.
+
context.groupMetadataManager.rescheduleGenericGroupMemberHeartbeat(group, 
member);
+
+// Advance time by half of the session timeout.
+assertEquals(Collections.emptyList(), context.sleep(5000 / 2));
+
+// Commit.
+context.commitOffset(
+new OffsetCommitRequestData()
+.setGroupId("foo")
+.setMemberId("member")
+.setGenerationIdOrMemberEpoch(1)
+.setRetentionTimeMs(1234L)
+.setTopics(Collections.singletonList(
+new OffsetCommitRequestData.OffsetCommitRequestTopic()
+.setName("bar")
+.setPartitions(Collections.singletonList(
+new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+.setPartitionIndex(0)
+.setCommittedOffset(100L)
+))
+))
+);
+
+// Advance time by half of the session timeout.
+assertEquals(Collections.emptyList(), context.sleep(5000 / 2));

Review Comment:
   nit: maybe make clear that no timeouts means the group is not expired? And 
then maybe have one more when the group does expire? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275334511


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetMetadataManagerTest {
+static class OffsetMetadataManagerTestContext {
+public static class Builder {
+final private MockTime time = new MockTime();
+final private LogContext logContext = new LogContext();
+final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+private MetadataImage metadataImage = null;
+private int offsetMetadataMaxSize = 4096;
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+OffsetMetadataManagerTestContext build() {
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+
+GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+.withTime(time)
+.withTimer(new MockCoordinatorTimer<>(time))
+.withSnapshotRegistry(snapshotRegistry)
+.withLogContext(logContext)
+.withMetadataImage(metadataImage)
+

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275333908


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -738,6 +742,84 @@ public String generateMemberId(String clientId, 
Optional groupInstanceId
 .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + 
UUID.randomUUID());
 }
 
+/**
+ * Validates that (1) the group instance id exists and is mapped to the 
member id
+ * if the group instance id is provided; and (2) the member id exists in 
the group.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param operation The operation.
+ *
+ * @throws UnknownMemberIdException
+ * @throws FencedInstanceIdException
+ */
+public void validateMember(
+String memberId,
+String groupInstanceId,
+String operation
+) throws UnknownMemberIdException, FencedInstanceIdException {
+if (groupInstanceId != null) {
+String existingMemberId = staticMemberId(groupInstanceId);
+if (existingMemberId == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+} else if (!existingMemberId.equals(memberId)) {
+log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+ "is fenced by existing memberId={} during operation 
{}",
+memberId, groupInstanceId, existingMemberId, operation);
+throw Errors.FENCED_INSTANCE_ID.exception();
+}
+}
+
+if (!hasMemberId(memberId)) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+}
+}
+
+/**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param generationId  The generation id.
+ */
+@Override
+public void validateOffsetCommit(
+String memberId,
+String groupInstanceId,
+int generationId
+) throws CoordinatorNotAvailableException, UnknownMemberIdException, 
IllegalGenerationException, FencedInstanceIdException {
+if (isInState(DEAD)) {
+throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
+}
+
+if (generationId < 0 && isInState(EMPTY)) {
+// When the generation id is -1, the request comes from either the 
admin client
+// or a consumer which does not use the group management facility. 
In this case,
+// the request can commit offsets if the group is empty.
+return;
+}
+
+if (generationId >= 0 || !memberId.isEmpty() || groupInstanceId != 
null) {
+validateMember(memberId, groupInstanceId, "offset-commit");

Review Comment:
   Got it. The old code used the same code path for the different apis but I 
guess we won't do the same here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275332469


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -497,6 +499,30 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
 return metadataRefreshDeadline;
 }
 
+/**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param memberEpoch   The member epoch.
+ */
+@Override
+public void validateOffsetCommit(
+String memberId,
+String groupInstanceId,
+int memberEpoch
+) throws UnknownMemberIdException, StaleMemberEpochException {
+// When the member epoch is -1, the request comes from either the 
admin client
+// or a consumer which does not use the group management facility. In 
this case,
+// the request can commit offsets if the group is empty.
+if (memberEpoch < 0 && members().isEmpty()) return;

Review Comment:
   I was just curious why isInState(EMPTY) is used in the generic group but not 
for consumer group, but then I also saw we don't have such a method.  



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -497,6 +499,30 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
 return metadataRefreshDeadline;
 }
 
+/**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param memberEpoch   The member epoch.
+ */
+@Override
+public void validateOffsetCommit(
+String memberId,
+String groupInstanceId,
+int memberEpoch
+) throws UnknownMemberIdException, StaleMemberEpochException {
+// When the member epoch is -1, the request comes from either the 
admin client
+// or a consumer which does not use the group management facility. In 
this case,
+// the request can commit offsets if the group is empty.
+if (memberEpoch < 0 && members().isEmpty()) return;

Review Comment:
   I was just curious why isInState(EMPTY) is used in the generic group but not 
for consumer group, but then I also saw we don't have such a method.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275328506


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -114,12 +116,29 @@ public static OffsetAndMetadata fromRecord(
 ) {
 return new OffsetAndMetadata(
 record.offset(),
-record.leaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
-OptionalInt.empty() : OptionalInt.of(record.leaderEpoch()),
+ofSentinel(record.leaderEpoch()),
 record.metadata(),
 record.commitTimestamp(),
-record.expireTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ?
-OptionalLong.empty() : 
OptionalLong.of(record.expireTimestamp())
+ofSentinel(record.expireTimestamp())
+);
+}
+
+/**
+ * @return An OffsetAndMetadata created from an 
OffsetCommitRequestPartition request.
+ */
+public static OffsetAndMetadata fromRequest(
+OffsetCommitRequestData.OffsetCommitRequestPartition partition,
+long currentTimeMs,
+OptionalLong expireTimestampMs
+) {
+return new OffsetAndMetadata(
+partition.committedOffset(),
+ofSentinel(partition.committedLeaderEpoch()),
+partition.committedMetadata() == null ?
+OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
+partition.commitTimestamp() == 
OffsetCommitRequest.DEFAULT_TIMESTAMP ?

Review Comment:
   I think I was reading the diff wrong again -- I see it correctly now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-26 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275322523


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274240171


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetMetadataManagerTest {
+static class OffsetMetadataManagerTestContext {
+public static class Builder {
+final private MockTime time = new MockTime();
+final private LogContext logContext = new LogContext();
+final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+private MetadataImage metadataImage = null;
+private int offsetMetadataMaxSize = 4096;
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+OffsetMetadataManagerTestContext build() {
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+
+GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+.withTime(time)
+.withTimer(new MockCoordinatorTimer<>(time))
+.withSnapshotRegistry(snapshotRegistry)
+.withLogContext(logContext)
+.withMetadataImage(metadataImage)
+

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274240171


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetMetadataManagerTest {
+static class OffsetMetadataManagerTestContext {
+public static class Builder {
+final private MockTime time = new MockTime();
+final private LogContext logContext = new LogContext();
+final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+private MetadataImage metadataImage = null;
+private int offsetMetadataMaxSize = 4096;
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+OffsetMetadataManagerTestContext build() {
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+
+GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+.withTime(time)
+.withTimer(new MockCoordinatorTimer<>(time))
+.withSnapshotRegistry(snapshotRegistry)
+.withLogContext(logContext)
+.withMetadataImage(metadataImage)
+

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274237674


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetMetadataManagerTest {
+static class OffsetMetadataManagerTestContext {
+public static class Builder {
+final private MockTime time = new MockTime();
+final private LogContext logContext = new LogContext();
+final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+private MetadataImage metadataImage = null;
+private int offsetMetadataMaxSize = 4096;
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+OffsetMetadataManagerTestContext build() {
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+
+GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+.withTime(time)
+.withTimer(new MockCoordinatorTimer<>(time))
+.withSnapshotRegistry(snapshotRegistry)
+.withLogContext(logContext)
+.withMetadataImage(metadataImage)
+

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274235539


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetMetadataManagerTest {
+static class OffsetMetadataManagerTestContext {
+public static class Builder {
+final private MockTime time = new MockTime();
+final private LogContext logContext = new LogContext();
+final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+private MetadataImage metadataImage = null;
+private int offsetMetadataMaxSize = 4096;
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+OffsetMetadataManagerTestContext build() {
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+
+GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+.withTime(time)
+.withTimer(new MockCoordinatorTimer<>(time))
+.withSnapshotRegistry(snapshotRegistry)
+.withLogContext(logContext)
+.withMetadataImage(metadataImage)
+

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274232061


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetMetadataManagerTest {
+static class OffsetMetadataManagerTestContext {
+public static class Builder {
+final private MockTime time = new MockTime();
+final private LogContext logContext = new LogContext();
+final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+private MetadataImage metadataImage = null;
+private int offsetMetadataMaxSize = 4096;
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+OffsetMetadataManagerTestContext build() {
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+
+GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+.withTime(time)
+.withTimer(new MockCoordinatorTimer<>(time))
+.withSnapshotRegistry(snapshotRegistry)
+.withLogContext(logContext)
+.withMetadataImage(metadataImage)
+

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274232061


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -0,0 +1,1113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
+import org.apache.kafka.coordinator.group.generic.GenericGroupState;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetMetadataManagerTest {
+static class OffsetMetadataManagerTestContext {
+public static class Builder {
+final private MockTime time = new MockTime();
+final private LogContext logContext = new LogContext();
+final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+private MetadataImage metadataImage = null;
+private int offsetMetadataMaxSize = 4096;
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+OffsetMetadataManagerTestContext build() {
+if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+
+GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+.withTime(time)
+.withTimer(new MockCoordinatorTimer<>(time))
+.withSnapshotRegistry(snapshotRegistry)
+.withLogContext(logContext)
+.withMetadataImage(metadataImage)
+

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274227661


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -956,6 +961,72 @@ public void testMaybeElectNewJoinedLeaderChooseExisting() {
 assertTrue(group.isLeader(memberId));
 }
 
+@Test
+public void testValidateOffsetCommit() {
+// A call from the admin client without any parameters should pass.
+group.validateOffsetCommit("", "", -1);
+
+// Add a member.
+group.add(new GenericGroupMember(
+"member-id",
+Optional.of("instance-id"),
+"",
+"",
+100,
+100,
+"consumer",
+new JoinGroupRequestProtocolCollection(Collections.singletonList(
+new JoinGroupRequestProtocol()
+.setName("roundrobin")
+.setMetadata(new byte[0])).iterator())
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.initNextGeneration();
+
+// No parameters and the group is not empty.
+assertThrows(UnknownMemberIdException.class,
+() -> group.validateOffsetCommit("", "", -1));
+
+// The member id does not exist.
+assertThrows(UnknownMemberIdException.class,
+() -> group.validateOffsetCommit("unknown", "unknown", -1));
+
+// The instance id does not exist.
+assertThrows(UnknownMemberIdException.class,
+() -> group.validateOffsetCommit("member-id", "unknown", -1));
+
+// The generation id does is invalid.

Review Comment:
   nit: comment seems to have an extra word



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274178895


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274177883


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -738,6 +742,84 @@ public String generateMemberId(String clientId, 
Optional groupInstanceId
 .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + 
UUID.randomUUID());
 }
 
+/**
+ * Validates that (1) the group instance id exists and is mapped to the 
member id
+ * if the group instance id is provided; and (2) the member id exists in 
the group.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param operation The operation.
+ *
+ * @throws UnknownMemberIdException
+ * @throws FencedInstanceIdException
+ */
+public void validateMember(
+String memberId,
+String groupInstanceId,
+String operation
+) throws UnknownMemberIdException, FencedInstanceIdException {
+if (groupInstanceId != null) {
+String existingMemberId = staticMemberId(groupInstanceId);
+if (existingMemberId == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+} else if (!existingMemberId.equals(memberId)) {
+log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+ "is fenced by existing memberId={} during operation 
{}",
+memberId, groupInstanceId, existingMemberId, operation);
+throw Errors.FENCED_INSTANCE_ID.exception();
+}
+}
+
+if (!hasMemberId(memberId)) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+}
+}
+
+/**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param generationId  The generation id.
+ */
+@Override
+public void validateOffsetCommit(
+String memberId,
+String groupInstanceId,
+int generationId
+) throws CoordinatorNotAvailableException, UnknownMemberIdException, 
IllegalGenerationException, FencedInstanceIdException {
+if (isInState(DEAD)) {
+throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
+}
+
+if (generationId < 0 && isInState(EMPTY)) {
+// When the generation id is -1, the request comes from either the 
admin client
+// or a consumer which does not use the group management facility. 
In this case,
+// the request can commit offsets if the group is empty.
+return;
+}
+
+if (generationId >= 0 || !memberId.isEmpty() || groupInstanceId != 
null) {
+validateMember(memberId, groupInstanceId, "offset-commit");

Review Comment:
   Do we not care to specify txn offset commits/do we expect to do them 
elsewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274175859


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -738,6 +742,84 @@ public String generateMemberId(String clientId, 
Optional groupInstanceId
 .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + 
UUID.randomUUID());
 }
 
+/**
+ * Validates that (1) the group instance id exists and is mapped to the 
member id
+ * if the group instance id is provided; and (2) the member id exists in 
the group.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param operation The operation.
+ *
+ * @throws UnknownMemberIdException
+ * @throws FencedInstanceIdException
+ */
+public void validateMember(

Review Comment:
   do we want this to be public?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274174772


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -497,6 +499,30 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
 return metadataRefreshDeadline;
 }
 
+/**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId  The member id.
+ * @param groupInstanceId   The group instance id.
+ * @param memberEpoch   The member epoch.
+ */
+@Override
+public void validateOffsetCommit(
+String memberId,
+String groupInstanceId,
+int memberEpoch
+) throws UnknownMemberIdException, StaleMemberEpochException {
+// When the member epoch is -1, the request comes from either the 
admin client
+// or a consumer which does not use the group management facility. In 
this case,
+// the request can commit offsets if the group is empty.
+if (memberEpoch < 0 && members().isEmpty()) return;

Review Comment:
   are there cases where members.isEmpty is true but the state is not yet EMPTY?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274168232


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java:
##
@@ -239,6 +283,14 @@ public void replay(Record record) throws RuntimeException {
 ApiMessageAndVersion value = record.value();
 
 switch (key.version()) {
+case 0:

Review Comment:
   Do we expect this for keys 0 and 1? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274166277


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java:
##
@@ -193,6 +217,22 @@ public CoordinatorResult genericGroupJoin(
 );
 }
 
+/**
+ * Handled a OffsetCommit request.

Review Comment:
   nit: handles



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274140048


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+public class Utils {
+private Utils() {}
+
+/**
+ * @return An OptionalInt containing the value iif the value is different 
from

Review Comment:
   nit: iff (same below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274140048


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+public class Utils {
+private Utils() {}
+
+/**
+ * @return An OptionalInt containing the value iif the value is different 
from

Review Comment:
   nit: iff



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274139408


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -114,12 +116,29 @@ public static OffsetAndMetadata fromRecord(
 ) {
 return new OffsetAndMetadata(
 record.offset(),
-record.leaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
-OptionalInt.empty() : OptionalInt.of(record.leaderEpoch()),
+ofSentinel(record.leaderEpoch()),
 record.metadata(),
 record.commitTimestamp(),
-record.expireTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ?
-OptionalLong.empty() : 
OptionalLong.of(record.expireTimestamp())
+ofSentinel(record.expireTimestamp())
+);
+}
+
+/**
+ * @return An OffsetAndMetadata created from an 
OffsetCommitRequestPartition request.
+ */
+public static OffsetAndMetadata fromRequest(
+OffsetCommitRequestData.OffsetCommitRequestPartition partition,
+long currentTimeMs,
+OptionalLong expireTimestampMs
+) {
+return new OffsetAndMetadata(
+partition.committedOffset(),
+ofSentinel(partition.committedLeaderEpoch()),
+partition.committedMetadata() == null ?
+OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
+partition.commitTimestamp() == 
OffsetCommitRequest.DEFAULT_TIMESTAMP ?

Review Comment:
   Just for my understanding, if we have no timestamp/default in the request we 
use the current time, but in the record we just set the sentinel/empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274067065


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-25 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1274065371


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-24 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1272624245


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-24 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1272624245


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-24 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1272621431


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-21 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1271176157


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-21 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1271176157


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-21 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1271173194


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -408,6 +408,31 @@ public MetadataImage image() {
 return metadataImage;
 }
 
+/**
+ *
+ * @param groupId
+ * @param createIfNotExists
+ * @return
+ * @throws GroupIdNotFoundException
+ */
+public Group getOrMaybeCreateSimpleGroup(

Review Comment:
   This naming is a bit confusing since I thought we only get the simple group. 
(Not any group)
   
   We should probably will in line 415 and that may help clear it up a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-21 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1271172931


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-21 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1271172931


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-21 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1271172399


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.generic.GenericGroup;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+
+/**
+ * The OffsetMetadataManager manages the offsets of all the groups. It 
basically maintains
+ * a mapping from group id to topic-partition to offset. This class has two 
kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response 
and records to
+ *mutate the hard state. Those records will be written by the runtime and 
applied to the
+ *hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used 
in the request
+ *handling as well as during the initial loading of the records from the 
partitions.
+ */
+public class OffsetMetadataManager {
+public static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
+private GroupMetadataManager groupMetadataManager = null;
+private int offsetMetadataMaxSize = 4096;
+private MetadataImage metadataImage = null;
+
+Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder withTime(Time time) {
+this.time = time;
+return this;
+}
+
+Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+this.groupMetadataManager = groupMetadataManager;
+return this;
+}
+
+Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
+this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+return this;
+}
+
+Builder withMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
+return this;
+}
+
+public OffsetMetadataManager build() {
+if (logContext == null) logContext = new LogContext();
+if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+if (metadataImage == null) metadataImage = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14067: KAFKA-14499: [3/N] Implement OffsetCommit API

2023-07-21 Thread via GitHub


jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1271168880


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -471,9 +472,48 @@ public CompletableFuture 
commitOffsets(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+if (request.groupId() == null) {
+return 
CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse(
+request,
+Errors.INVALID_GROUP_ID
+));
+}
+
+return runtime.scheduleWriteOperation(
+"commit-offset",
+topicPartitionFor(request.groupId()),
+coordinator -> coordinator.commitOffset(context, request)
+).exceptionally(exception -> {

Review Comment:
   This is pulled from here, right? 
https://github.com/apache/kafka/blob/1656591d0b339c385d0ba1f938fc94b52e29965d/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L437



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org