This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2528dd41169 KAFKA-14499: [2/N] Add OffsetCommit record & related
(#14047)
2528dd41169 is described below
commit 2528dd41169a95f05117ba01302c5dcce0642ab1
Author: David Jacot <[email protected]>
AuthorDate: Fri Jul 21 20:09:06 2023 +0200
KAFKA-14499: [2/N] Add OffsetCommit record & related (#14047)
This patch does a few things:
1) It introduces the `OffsetAndMetadata` class which hold the committed
offsets in the group coordinator.
2) It adds methods to deal with OffsetCommit records to `RecordHelpers`.
3) It adds `MetadataVersion#offsetCommitValueVersion` to get the version of
the OffsetCommit value record that should be used.
Reviewers: Jeff Kim <[email protected]>, David Arthur
<[email protected]>, Justine Olshan <[email protected]>
---
checkstyle/import-control.xml | 1 +
.../kafka/coordinator/group/OffsetAndMetadata.java | 125 +++++++++++++++++++++
.../kafka/coordinator/group/RecordHelpers.java | 69 ++++++++++++
.../coordinator/group/OffsetAndMetadataTest.java | 74 ++++++++++++
.../kafka/coordinator/group/RecordHelpersTest.java | 108 ++++++++++++++++++
.../kafka/server/common/MetadataVersion.java | 12 ++
.../kafka/server/common/MetadataVersionTest.java | 20 ++++
7 files changed, 409 insertions(+)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9ae0cb12449..7ecc4d5a1d1 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -229,6 +229,7 @@
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.coordinator.group" />
<allow pkg="org.apache.kafka.deferred" />
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
new file mode 100644
index 00000000000..b59933a249d
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+/**
+ * Represents a committed offset with its metadata.
+ */
+public class OffsetAndMetadata {
+ public static final String NO_METADATA = "";
+
+ /**
+ * The committed offset.
+ */
+ public final long offset;
+
+ /**
+ * The leader epoch in use when the offset was committed.
+ */
+ public final OptionalInt leaderEpoch;
+
+ /**
+ * The committed metadata. The Kafka offset commit API allows users to
provide additional
+ * metadata (in the form of a string) when an offset is committed. This
can be useful
+ * (for example) to store information about which node made the commit,
what time the
+ * commit was made, etc.
+ */
+ public final String metadata;
+
+ /**
+ * The commit timestamp in milliseconds.
+ */
+ public final long commitTimestampMs;
+
+ /**
+ * The expire timestamp in milliseconds.
+ */
+ public final OptionalLong expireTimestampMs;
+
+ public OffsetAndMetadata(
+ long offset,
+ OptionalInt leaderEpoch,
+ String metadata,
+ long commitTimestampMs,
+ OptionalLong expireTimestampMs
+ ) {
+ this.offset = offset;
+ this.leaderEpoch = Objects.requireNonNull(leaderEpoch);
+ this.metadata = Objects.requireNonNull(metadata);
+ this.commitTimestampMs = commitTimestampMs;
+ this.expireTimestampMs = Objects.requireNonNull(expireTimestampMs);
+ }
+
+ @Override
+ public String toString() {
+ return "OffsetAndMetadata(offset=" + offset +
+ ", leaderEpoch=" + leaderEpoch +
+ ", metadata=" + metadata +
+ ", commitTimestampMs=" + commitTimestampMs +
+ ", expireTimestampMs=" + expireTimestampMs +
+ ')';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ OffsetAndMetadata that = (OffsetAndMetadata) o;
+
+ if (offset != that.offset) return false;
+ if (commitTimestampMs != that.commitTimestampMs) return false;
+ if (!leaderEpoch.equals(that.leaderEpoch)) return false;
+ if (!metadata.equals(that.metadata)) return false;
+ return expireTimestampMs.equals(that.expireTimestampMs);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (offset ^ (offset >>> 32));
+ result = 31 * result + leaderEpoch.hashCode();
+ result = 31 * result + metadata.hashCode();
+ result = 31 * result + (int) (commitTimestampMs ^ (commitTimestampMs
>>> 32));
+ result = 31 * result + expireTimestampMs.hashCode();
+ return result;
+ }
+
+ /**
+ * @return An OffsetAndMetadata created from a OffsetCommitValue record.
+ */
+ public static OffsetAndMetadata fromRecord(
+ OffsetCommitValue record
+ ) {
+ return new OffsetAndMetadata(
+ record.offset(),
+ record.leaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
+ OptionalInt.empty() : OptionalInt.of(record.leaderEpoch()),
+ record.metadata(),
+ record.commitTimestamp(),
+ record.expireTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ?
+ OptionalLong.empty() :
OptionalLong.of(record.expireTimestamp())
+ );
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
index 863be1070a4..14a55b87324 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
@@ -17,6 +17,8 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
@@ -33,6 +35,8 @@ import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -467,6 +471,71 @@ public class RecordHelpers {
);
}
+ /**
+ * Creates an OffsetCommit record.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partitionId The partition id.
+ * @param offsetAndMetadata The offset and metadata.
+ * @param metadataVersion The metadata version.
+ * @return The record.
+ */
+ public static Record newOffsetCommitRecord(
+ String groupId,
+ String topic,
+ int partitionId,
+ OffsetAndMetadata offsetAndMetadata,
+ MetadataVersion metadataVersion
+ ) {
+ short version =
metadataVersion.offsetCommitValueVersion(offsetAndMetadata.expireTimestampMs.isPresent());
+
+ return new Record(
+ new ApiMessageAndVersion(
+ new OffsetCommitKey()
+ .setGroup(groupId)
+ .setTopic(topic)
+ .setPartition(partitionId),
+ (short) 1
+ ),
+ new ApiMessageAndVersion(
+ new OffsetCommitValue()
+ .setOffset(offsetAndMetadata.offset)
+
.setLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+ .setMetadata(offsetAndMetadata.metadata)
+ .setCommitTimestamp(offsetAndMetadata.commitTimestampMs)
+ // Version 1 has a non-empty expireTimestamp field
+
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)),
+ version
+ )
+ );
+ }
+
+ /**
+ * Creates an OffsetCommit tombstone record.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partitionId The partition id.
+ * @return The record.
+ */
+ public static Record newOffsetCommitTombstoneRecord(
+ String groupId,
+ String topic,
+ int partitionId
+ ) {
+ return new Record(
+ new ApiMessageAndVersion(
+ new OffsetCommitKey()
+ .setGroup(groupId)
+ .setTopic(topic)
+ .setPartition(partitionId),
+ (short) 1
+ ),
+ null
+ );
+ }
+
private static
List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
toTopicPartitions(
Map<Uuid, Set<Integer>> topicPartitions
) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
new file mode 100644
index 00000000000..adac36b8ad1
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.junit.jupiter.api.Test;
+
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OffsetAndMetadataTest {
+ @Test
+ public void testAttributes() {
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
+ 100L,
+ OptionalInt.of(10),
+ "metadata",
+ 1234L,
+ OptionalLong.of(5678L)
+ );
+
+ assertEquals(100L, offsetAndMetadata.offset);
+ assertEquals(OptionalInt.of(10), offsetAndMetadata.leaderEpoch);
+ assertEquals("metadata", offsetAndMetadata.metadata);
+ assertEquals(1234L, offsetAndMetadata.commitTimestampMs);
+ assertEquals(OptionalLong.of(5678L),
offsetAndMetadata.expireTimestampMs);
+ }
+
+ @Test
+ public void testFromRecord() {
+ OffsetCommitValue record = new OffsetCommitValue()
+ .setOffset(100L)
+ .setLeaderEpoch(-1)
+ .setMetadata("metadata")
+ .setCommitTimestamp(1234L)
+ .setExpireTimestamp(-1L);
+
+ assertEquals(new OffsetAndMetadata(
+ 100L,
+ OptionalInt.empty(),
+ "metadata",
+ 1234L,
+ OptionalLong.empty()
+ ), OffsetAndMetadata.fromRecord(record));
+
+ record
+ .setLeaderEpoch(12)
+ .setExpireTimestamp(5678L);
+
+ assertEquals(new OffsetAndMetadata(
+ 100L,
+ OptionalInt.of(12),
+ "metadata",
+ 1234L,
+ OptionalLong.of(5678L)
+ ), OffsetAndMetadata.fromRecord(record));
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
index 2b436672b0e..accda00808a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
@@ -40,6 +40,8 @@ import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
@@ -48,6 +50,7 @@ import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteBuffer;
@@ -59,6 +62,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Stream;
@@ -648,4 +653,107 @@ public class RecordHelpersTest {
assertEquals(expectedRecord, groupMetadataRecord);
}
+
+ @ParameterizedTest
+ @EnumSource(value = MetadataVersion.class)
+ public void testNewOffsetCommitRecord(MetadataVersion metadataVersion) {
+ OffsetCommitKey key = new OffsetCommitKey()
+ .setGroup("group-id")
+ .setTopic("foo")
+ .setPartition(1);
+ OffsetCommitValue value = new OffsetCommitValue()
+ .setOffset(100L)
+ .setLeaderEpoch(10)
+ .setMetadata("metadata")
+ .setCommitTimestamp(1234L)
+ .setExpireTimestamp(-1L);
+
+ Record expectedRecord = new Record(
+ new ApiMessageAndVersion(
+ key,
+ (short) 1),
+ new ApiMessageAndVersion(
+ value,
+ metadataVersion.offsetCommitValueVersion(false)
+ )
+ );
+
+ assertEquals(expectedRecord, RecordHelpers.newOffsetCommitRecord(
+ "group-id",
+ "foo",
+ 1,
+ new OffsetAndMetadata(
+ 100L,
+ OptionalInt.of(10),
+ "metadata",
+ 1234L,
+ OptionalLong.empty()),
+ metadataVersion
+ ));
+
+ value.setLeaderEpoch(-1);
+
+ assertEquals(expectedRecord, RecordHelpers.newOffsetCommitRecord(
+ "group-id",
+ "foo",
+ 1,
+ new OffsetAndMetadata(
+ 100L,
+ OptionalInt.empty(),
+ "metadata",
+ 1234L,
+ OptionalLong.empty()),
+ metadataVersion
+ ));
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = MetadataVersion.class)
+ public void testNewOffsetCommitRecordWithExpireTimestamp(MetadataVersion
metadataVersion) {
+ Record expectedRecord = new Record(
+ new ApiMessageAndVersion(
+ new OffsetCommitKey()
+ .setGroup("group-id")
+ .setTopic("foo")
+ .setPartition(1),
+ (short) 1),
+ new ApiMessageAndVersion(
+ new OffsetCommitValue()
+ .setOffset(100L)
+ .setLeaderEpoch(10)
+ .setMetadata("metadata")
+ .setCommitTimestamp(1234L)
+ .setExpireTimestamp(5678L),
+ (short) 1 // When expire timestamp is set, it is always
version 1.
+ )
+ );
+
+ assertEquals(expectedRecord, RecordHelpers.newOffsetCommitRecord(
+ "group-id",
+ "foo",
+ 1,
+ new OffsetAndMetadata(
+ 100L,
+ OptionalInt.of(10),
+ "metadata",
+ 1234L,
+ OptionalLong.of(5678L)),
+ metadataVersion
+ ));
+ }
+
+ @Test
+ public void testNewOffsetCommitTombstoneRecord() {
+ Record expectedRecord = new Record(
+ new ApiMessageAndVersion(
+ new OffsetCommitKey()
+ .setGroup("group-id")
+ .setTopic("foo")
+ .setPartition(1),
+ (short) 1),
+ null);
+
+ Record record =
RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1);
+ assertEquals(expectedRecord, record);
+ }
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 0ccfcc06be1..17220e7dbf5 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -386,6 +386,18 @@ public enum MetadataVersion {
}
}
+ public short offsetCommitValueVersion(boolean expireTimestampMs) {
+ if (isLessThan(MetadataVersion.IBP_2_1_IV0) || expireTimestampMs) {
+ return 1;
+ } else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) {
+ return 2;
+ } else {
+ // Serialize with the highest supported non-flexible version
+ // until a tagged field is introduced or the version is bumped.
+ return 3;
+ }
+ }
+
private static final Map<String, MetadataVersion> IBP_VERSIONS;
static {
{
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index cb8424e3f77..faff3999ffb 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -328,4 +328,24 @@ class MetadataVersionTest {
}
assertEquals(expectedVersion,
metadataVersion.groupMetadataValueVersion());
}
+
+ @ParameterizedTest
+ @EnumSource(value = MetadataVersion.class)
+ public void testOffsetCommitValueVersion(MetadataVersion metadataVersion) {
+ final short expectedVersion;
+ if (metadataVersion.isAtLeast(MetadataVersion.IBP_2_1_IV1)) {
+ expectedVersion = 3;
+ } else if (metadataVersion.isAtLeast(IBP_2_1_IV0)) {
+ expectedVersion = 2;
+ } else {
+ expectedVersion = 1;
+ }
+ assertEquals(expectedVersion,
metadataVersion.offsetCommitValueVersion(false));
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = MetadataVersion.class)
+ public void
testOffsetCommitValueVersionWithExpiredTimestamp(MetadataVersion
metadataVersion) {
+ assertEquals((short) 1,
metadataVersion.offsetCommitValueVersion(true));
+ }
}