This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2528dd41169 KAFKA-14499: [2/N] Add OffsetCommit record & related 
(#14047)
2528dd41169 is described below

commit 2528dd41169a95f05117ba01302c5dcce0642ab1
Author: David Jacot <[email protected]>
AuthorDate: Fri Jul 21 20:09:06 2023 +0200

    KAFKA-14499: [2/N] Add OffsetCommit record & related (#14047)
    
    This patch does a few things:
    1) It introduces the `OffsetAndMetadata` class which hold the committed 
offsets in the group coordinator.
    2) It adds methods to deal with OffsetCommit records to `RecordHelpers`.
    3) It adds `MetadataVersion#offsetCommitValueVersion` to get the version of 
the OffsetCommit value record that should be used.
    
    Reviewers: Jeff Kim <[email protected]>, David Arthur 
<[email protected]>, Justine Olshan <[email protected]>
---
 checkstyle/import-control.xml                      |   1 +
 .../kafka/coordinator/group/OffsetAndMetadata.java | 125 +++++++++++++++++++++
 .../kafka/coordinator/group/RecordHelpers.java     |  69 ++++++++++++
 .../coordinator/group/OffsetAndMetadataTest.java   |  74 ++++++++++++
 .../kafka/coordinator/group/RecordHelpersTest.java | 108 ++++++++++++++++++
 .../kafka/server/common/MetadataVersion.java       |  12 ++
 .../kafka/server/common/MetadataVersionTest.java   |  20 ++++
 7 files changed, 409 insertions(+)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9ae0cb12449..7ecc4d5a1d1 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -229,6 +229,7 @@
       <allow pkg="org.apache.kafka.common.metadata" />
       <allow pkg="org.apache.kafka.common.network" />
       <allow pkg="org.apache.kafka.common.protocol" />
+      <allow pkg="org.apache.kafka.common.record" />
       <allow pkg="org.apache.kafka.common.requests" />
       <allow pkg="org.apache.kafka.coordinator.group" />
       <allow pkg="org.apache.kafka.deferred" />
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
new file mode 100644
index 00000000000..b59933a249d
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+/**
+ * Represents a committed offset with its metadata.
+ */
+public class OffsetAndMetadata {
+    public static final String NO_METADATA = "";
+
+    /**
+     * The committed offset.
+     */
+    public final long offset;
+
+    /**
+     * The leader epoch in use when the offset was committed.
+     */
+    public final OptionalInt leaderEpoch;
+
+    /**
+     * The committed metadata. The Kafka offset commit API allows users to 
provide additional
+     * metadata (in the form of a string) when an offset is committed. This 
can be useful
+     * (for example) to store information about which node made the commit, 
what time the
+     * commit was made, etc.
+     */
+    public final String metadata;
+
+    /**
+     * The commit timestamp in milliseconds.
+     */
+    public final long commitTimestampMs;
+
+    /**
+     * The expire timestamp in milliseconds.
+     */
+    public final OptionalLong expireTimestampMs;
+
+    public OffsetAndMetadata(
+        long offset,
+        OptionalInt leaderEpoch,
+        String metadata,
+        long commitTimestampMs,
+        OptionalLong expireTimestampMs
+    ) {
+        this.offset = offset;
+        this.leaderEpoch = Objects.requireNonNull(leaderEpoch);
+        this.metadata = Objects.requireNonNull(metadata);
+        this.commitTimestampMs = commitTimestampMs;
+        this.expireTimestampMs = Objects.requireNonNull(expireTimestampMs);
+    }
+
+    @Override
+    public String toString() {
+        return "OffsetAndMetadata(offset=" + offset +
+            ", leaderEpoch=" + leaderEpoch +
+            ", metadata=" + metadata +
+            ", commitTimestampMs=" + commitTimestampMs +
+            ", expireTimestampMs=" + expireTimestampMs +
+            ')';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        OffsetAndMetadata that = (OffsetAndMetadata) o;
+
+        if (offset != that.offset) return false;
+        if (commitTimestampMs != that.commitTimestampMs) return false;
+        if (!leaderEpoch.equals(that.leaderEpoch)) return false;
+        if (!metadata.equals(that.metadata)) return false;
+        return expireTimestampMs.equals(that.expireTimestampMs);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) (offset ^ (offset >>> 32));
+        result = 31 * result + leaderEpoch.hashCode();
+        result = 31 * result + metadata.hashCode();
+        result = 31 * result + (int) (commitTimestampMs ^ (commitTimestampMs 
>>> 32));
+        result = 31 * result + expireTimestampMs.hashCode();
+        return result;
+    }
+
+    /**
+     * @return An OffsetAndMetadata created from a OffsetCommitValue record.
+     */
+    public static OffsetAndMetadata fromRecord(
+        OffsetCommitValue record
+    ) {
+        return new OffsetAndMetadata(
+            record.offset(),
+            record.leaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
+                OptionalInt.empty() : OptionalInt.of(record.leaderEpoch()),
+            record.metadata(),
+            record.commitTimestamp(),
+            record.expireTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ?
+                OptionalLong.empty() : 
OptionalLong.of(record.expireTimestamp())
+        );
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
index 863be1070a4..14a55b87324 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
 import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
@@ -33,6 +35,8 @@ import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
 import org.apache.kafka.coordinator.group.generic.GenericGroup;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -467,6 +471,71 @@ public class RecordHelpers {
         );
     }
 
+    /**
+     * Creates an OffsetCommit record.
+     *
+     * @param groupId           The group id.
+     * @param topic             The topic name.
+     * @param partitionId       The partition id.
+     * @param offsetAndMetadata The offset and metadata.
+     * @param metadataVersion   The metadata version.
+     * @return The record.
+     */
+    public static Record newOffsetCommitRecord(
+        String groupId,
+        String topic,
+        int partitionId,
+        OffsetAndMetadata offsetAndMetadata,
+        MetadataVersion metadataVersion
+    ) {
+        short version = 
metadataVersion.offsetCommitValueVersion(offsetAndMetadata.expireTimestampMs.isPresent());
+
+        return new Record(
+            new ApiMessageAndVersion(
+                new OffsetCommitKey()
+                    .setGroup(groupId)
+                    .setTopic(topic)
+                    .setPartition(partitionId),
+                (short) 1
+            ),
+            new ApiMessageAndVersion(
+                new OffsetCommitValue()
+                    .setOffset(offsetAndMetadata.offset)
+                    
.setLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+                    .setMetadata(offsetAndMetadata.metadata)
+                    .setCommitTimestamp(offsetAndMetadata.commitTimestampMs)
+                    // Version 1 has a non-empty expireTimestamp field
+                    
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)),
+                version
+            )
+        );
+    }
+
+    /**
+     * Creates an OffsetCommit tombstone record.
+     *
+     * @param groupId           The group id.
+     * @param topic             The topic name.
+     * @param partitionId       The partition id.
+     * @return The record.
+     */
+    public static Record newOffsetCommitTombstoneRecord(
+        String groupId,
+        String topic,
+        int partitionId
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new OffsetCommitKey()
+                    .setGroup(groupId)
+                    .setTopic(topic)
+                    .setPartition(partitionId),
+                (short) 1
+            ),
+            null
+        );
+    }
+
     private static 
List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
toTopicPartitions(
         Map<Uuid, Set<Integer>> topicPartitions
     ) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
new file mode 100644
index 00000000000..adac36b8ad1
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.junit.jupiter.api.Test;
+
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OffsetAndMetadataTest {
+    @Test
+    public void testAttributes() {
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
+            100L,
+            OptionalInt.of(10),
+            "metadata",
+            1234L,
+            OptionalLong.of(5678L)
+        );
+
+        assertEquals(100L, offsetAndMetadata.offset);
+        assertEquals(OptionalInt.of(10), offsetAndMetadata.leaderEpoch);
+        assertEquals("metadata", offsetAndMetadata.metadata);
+        assertEquals(1234L, offsetAndMetadata.commitTimestampMs);
+        assertEquals(OptionalLong.of(5678L), 
offsetAndMetadata.expireTimestampMs);
+    }
+
+    @Test
+    public void testFromRecord() {
+        OffsetCommitValue record = new OffsetCommitValue()
+            .setOffset(100L)
+            .setLeaderEpoch(-1)
+            .setMetadata("metadata")
+            .setCommitTimestamp(1234L)
+            .setExpireTimestamp(-1L);
+
+        assertEquals(new OffsetAndMetadata(
+            100L,
+            OptionalInt.empty(),
+            "metadata",
+            1234L,
+            OptionalLong.empty()
+        ), OffsetAndMetadata.fromRecord(record));
+
+        record
+            .setLeaderEpoch(12)
+            .setExpireTimestamp(5678L);
+
+        assertEquals(new OffsetAndMetadata(
+            100L,
+            OptionalInt.of(12),
+            "metadata",
+            1234L,
+            OptionalLong.of(5678L)
+        ), OffsetAndMetadata.fromRecord(record));
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
index 2b436672b0e..accda00808a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
@@ -40,6 +40,8 @@ import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
 import org.apache.kafka.coordinator.group.generic.GenericGroup;
 import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
 import org.apache.kafka.coordinator.group.generic.GenericGroupState;
@@ -48,6 +50,7 @@ import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.nio.ByteBuffer;
@@ -59,6 +62,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.stream.Stream;
 
@@ -648,4 +653,107 @@ public class RecordHelpersTest {
 
         assertEquals(expectedRecord, groupMetadataRecord);
     }
+
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class)
+    public void testNewOffsetCommitRecord(MetadataVersion metadataVersion) {
+        OffsetCommitKey key = new OffsetCommitKey()
+            .setGroup("group-id")
+            .setTopic("foo")
+            .setPartition(1);
+        OffsetCommitValue value = new OffsetCommitValue()
+            .setOffset(100L)
+            .setLeaderEpoch(10)
+            .setMetadata("metadata")
+            .setCommitTimestamp(1234L)
+            .setExpireTimestamp(-1L);
+
+        Record expectedRecord = new Record(
+            new ApiMessageAndVersion(
+                key,
+                (short) 1),
+            new ApiMessageAndVersion(
+                value,
+                metadataVersion.offsetCommitValueVersion(false)
+            )
+        );
+
+        assertEquals(expectedRecord, RecordHelpers.newOffsetCommitRecord(
+            "group-id",
+            "foo",
+            1,
+            new OffsetAndMetadata(
+                100L,
+                OptionalInt.of(10),
+                "metadata",
+                1234L,
+                OptionalLong.empty()),
+            metadataVersion
+        ));
+
+        value.setLeaderEpoch(-1);
+
+        assertEquals(expectedRecord, RecordHelpers.newOffsetCommitRecord(
+            "group-id",
+            "foo",
+            1,
+            new OffsetAndMetadata(
+                100L,
+                OptionalInt.empty(),
+                "metadata",
+                1234L,
+                OptionalLong.empty()),
+            metadataVersion
+        ));
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class)
+    public void testNewOffsetCommitRecordWithExpireTimestamp(MetadataVersion 
metadataVersion) {
+        Record expectedRecord = new Record(
+            new ApiMessageAndVersion(
+                new OffsetCommitKey()
+                    .setGroup("group-id")
+                    .setTopic("foo")
+                    .setPartition(1),
+                (short) 1),
+            new ApiMessageAndVersion(
+                new OffsetCommitValue()
+                    .setOffset(100L)
+                    .setLeaderEpoch(10)
+                    .setMetadata("metadata")
+                    .setCommitTimestamp(1234L)
+                    .setExpireTimestamp(5678L),
+                (short) 1 // When expire timestamp is set, it is always 
version 1.
+            )
+        );
+
+        assertEquals(expectedRecord, RecordHelpers.newOffsetCommitRecord(
+            "group-id",
+            "foo",
+            1,
+            new OffsetAndMetadata(
+                100L,
+                OptionalInt.of(10),
+                "metadata",
+                1234L,
+                OptionalLong.of(5678L)),
+            metadataVersion
+        ));
+    }
+
+    @Test
+    public void testNewOffsetCommitTombstoneRecord() {
+        Record expectedRecord = new Record(
+            new ApiMessageAndVersion(
+                new OffsetCommitKey()
+                    .setGroup("group-id")
+                    .setTopic("foo")
+                    .setPartition(1),
+                (short) 1),
+            null);
+
+        Record record = 
RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1);
+        assertEquals(expectedRecord, record);
+    }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 0ccfcc06be1..17220e7dbf5 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -386,6 +386,18 @@ public enum MetadataVersion {
         }
     }
 
+    public short offsetCommitValueVersion(boolean expireTimestampMs) {
+        if (isLessThan(MetadataVersion.IBP_2_1_IV0) || expireTimestampMs) {
+            return 1;
+        } else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) {
+            return 2;
+        } else {
+            // Serialize with the highest supported non-flexible version
+            // until a tagged field is introduced or the version is bumped.
+            return  3;
+        }
+    }
+
     private static final Map<String, MetadataVersion> IBP_VERSIONS;
     static {
         {
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index cb8424e3f77..faff3999ffb 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -328,4 +328,24 @@ class MetadataVersionTest {
         }
         assertEquals(expectedVersion, 
metadataVersion.groupMetadataValueVersion());
     }
+
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class)
+    public void testOffsetCommitValueVersion(MetadataVersion metadataVersion) {
+        final short expectedVersion;
+        if (metadataVersion.isAtLeast(MetadataVersion.IBP_2_1_IV1)) {
+            expectedVersion = 3;
+        } else if (metadataVersion.isAtLeast(IBP_2_1_IV0)) {
+            expectedVersion = 2;
+        } else {
+            expectedVersion = 1;
+        }
+        assertEquals(expectedVersion, 
metadataVersion.offsetCommitValueVersion(false));
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class)
+    public void 
testOffsetCommitValueVersionWithExpiredTimestamp(MetadataVersion 
metadataVersion) {
+        assertEquals((short) 1, 
metadataVersion.offsetCommitValueVersion(true));
+    }
 }

Reply via email to