This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 199772adc5a KAFKA-19141; Persist topic id in OffsetCommit record
(#19683)
199772adc5a is described below
commit 199772adc5a758ef74ae3adc6a5da471e50bfb27
Author: David Jacot <[email protected]>
AuthorDate: Fri May 16 16:26:36 2025 +0200
KAFKA-19141; Persist topic id in OffsetCommit record (#19683)
This patch adds the `TopicId` field to the `OffsetCommitValue` record as
a tagged field. It will be later used on the offset fetch path to ensure
that the persisted offset matches the requested one.
Reviewers: Dongnuo Lyu <[email protected]>, Sean Quah
<[email protected]>, Lianet Magrans <[email protected]>
---
.../group/GroupCoordinatorRecordHelpers.java | 7 +-
.../kafka/coordinator/group/OffsetAndMetadata.java | 29 +++-
.../coordinator/group/OffsetMetadataManager.java | 1 +
.../common/message/OffsetCommitValue.json | 4 +-
.../group/GroupCoordinatorRecordHelpersTest.java | 33 +++-
.../coordinator/group/OffsetAndMetadataTest.java | 53 ++++--
.../group/OffsetExpirationConditionImplTest.java | 8 +-
.../group/OffsetMetadataManagerTest.java | 191 +++++++++------------
.../group/classic/ClassicGroupTest.java | 2 +-
.../group/modern/consumer/ConsumerGroupTest.java | 2 +-
.../group/streams/StreamsGroupTest.java | 2 +-
.../tools/consumer/OffsetMessageFormatterTest.java | 8 +-
12 files changed, 195 insertions(+), 145 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index 457d090fc53..ad17bfe34f2 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -517,7 +517,8 @@ public class GroupCoordinatorRecordHelpers {
.setMetadata(offsetAndMetadata.metadata)
.setCommitTimestamp(offsetAndMetadata.commitTimestampMs)
// Version 1 has a non-empty expireTimestamp field
-
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)),
+
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
+ .setTopicId(offsetAndMetadata.topicId),
version
)
);
@@ -527,9 +528,7 @@ public class GroupCoordinatorRecordHelpers {
if (expireTimestampMs) {
return 1;
} else {
- // Serialize with the highest supported non-flexible version
- // until a tagged field is introduced or the version is bumped.
- return 3;
+ return 4;
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
index a5635f58ac5..b8c8ee28238 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
@@ -65,12 +66,18 @@ public class OffsetAndMetadata {
*/
public final long recordOffset;
+ /**
+ * The topic id used to commit the offset.
+ */
+ public final Uuid topicId;
+
public OffsetAndMetadata(
long committedOffset,
OptionalInt leaderEpoch,
String metadata,
long commitTimestampMs,
- OptionalLong expireTimestampMs
+ OptionalLong expireTimestampMs,
+ Uuid topicId
) {
this(
-1L,
@@ -78,7 +85,8 @@ public class OffsetAndMetadata {
leaderEpoch,
metadata,
commitTimestampMs,
- expireTimestampMs
+ expireTimestampMs,
+ topicId
);
}
@@ -88,7 +96,8 @@ public class OffsetAndMetadata {
OptionalInt leaderEpoch,
String metadata,
long commitTimestampMs,
- OptionalLong expireTimestampMs
+ OptionalLong expireTimestampMs,
+ Uuid topicId
) {
this.recordOffset = recordOffset;
this.committedOffset = committedOffset;
@@ -96,6 +105,7 @@ public class OffsetAndMetadata {
this.metadata = Objects.requireNonNull(metadata);
this.commitTimestampMs = commitTimestampMs;
this.expireTimestampMs = Objects.requireNonNull(expireTimestampMs);
+ this.topicId = topicId;
}
@Override
@@ -105,6 +115,7 @@ public class OffsetAndMetadata {
", metadata=" + metadata +
", commitTimestampMs=" + commitTimestampMs +
", expireTimestampMs=" + expireTimestampMs +
+ ", topicId=" + topicId +
", recordOffset=" + recordOffset +
')';
}
@@ -121,6 +132,7 @@ public class OffsetAndMetadata {
if (recordOffset != that.recordOffset) return false;
if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false;
if (!Objects.equals(metadata, that.metadata)) return false;
+ if (!Objects.equals(topicId, that.topicId)) return false;
return Objects.equals(expireTimestampMs, that.expireTimestampMs);
}
@@ -129,6 +141,7 @@ public class OffsetAndMetadata {
int result = (int) (committedOffset ^ (committedOffset >>> 32));
result = 31 * result + (leaderEpoch != null ? leaderEpoch.hashCode() :
0);
result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
+ result = 31 * result + (topicId != null ? topicId.hashCode() : 0);
result = 31 * result + (int) (commitTimestampMs ^ (commitTimestampMs
>>> 32));
result = 31 * result + (expireTimestampMs != null ?
expireTimestampMs.hashCode() : 0);
result = 31 * result + (int) (recordOffset ^ (recordOffset >>> 32));
@@ -148,7 +161,8 @@ public class OffsetAndMetadata {
ofSentinel(record.leaderEpoch()),
record.metadata(),
record.commitTimestamp(),
- ofSentinel(record.expireTimestamp())
+ ofSentinel(record.expireTimestamp()),
+ record.topicId()
);
}
@@ -156,6 +170,7 @@ public class OffsetAndMetadata {
* @return An OffsetAndMetadata created from an
OffsetCommitRequestPartition request.
*/
public static OffsetAndMetadata fromRequest(
+ Uuid topicId,
OffsetCommitRequestData.OffsetCommitRequestPartition partition,
long currentTimeMs,
OptionalLong expireTimestampMs
@@ -166,7 +181,8 @@ public class OffsetAndMetadata {
partition.committedMetadata() == null ?
OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
currentTimeMs,
- expireTimestampMs
+ expireTimestampMs,
+ topicId
);
}
@@ -183,7 +199,8 @@ public class OffsetAndMetadata {
partition.committedMetadata() == null ?
OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
currentTimeMs,
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
);
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 4629df32217..1f67d23246c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -641,6 +641,7 @@ public class OffsetMetadataManager {
.setErrorCode(Errors.NONE.code()));
final OffsetAndMetadata offsetAndMetadata =
OffsetAndMetadata.fromRequest(
+ topic.topicId(),
partition,
currentTimeMs,
expireTimestampMs
diff --git
a/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json
b/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json
index 8f7d32d5447..d9eca6cebe8 100644
--- a/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json
+++ b/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json
@@ -32,6 +32,8 @@
{ "name": "commitTimestamp", "type": "int64", "versions": "0+",
"about": "The time at which the commit was added to the log."},
{ "name": "expireTimestamp", "type": "int64", "versions": "1", "default":
-1, "ignorable": true,
- "about": "The time at which the offset will expire."}
+ "about": "The time at which the offset will expire."},
+ { "name": "topicId", "type": "uuid", "versions": "4+", "taggedVersions":
"4+", "tag": 0, "ignorable": true,
+ "about": "The topic id of the committed offset."}
]
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index 8bd0987ac46..be2e9df5fea 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -55,6 +55,8 @@ import
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpress
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -68,6 +70,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.stream.Stream;
import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
@@ -720,11 +723,19 @@ public class GroupCoordinatorRecordHelpersTest {
@Test
public void testOffsetCommitValueVersion() {
assertEquals((short) 1,
GroupCoordinatorRecordHelpers.offsetCommitValueVersion(true));
- assertEquals((short) 3,
GroupCoordinatorRecordHelpers.offsetCommitValueVersion(false));
+ assertEquals((short) 4,
GroupCoordinatorRecordHelpers.offsetCommitValueVersion(false));
}
- @Test
- public void testNewOffsetCommitRecord() {
+ private static Stream<Uuid> uuids() {
+ return Stream.of(
+ Uuid.ZERO_UUID,
+ Uuid.randomUuid()
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("uuids")
+ public void testNewOffsetCommitRecord(Uuid topicId) {
OffsetCommitKey key = new OffsetCommitKey()
.setGroup("group-id")
.setTopic("foo")
@@ -734,7 +745,8 @@ public class GroupCoordinatorRecordHelpersTest {
.setLeaderEpoch(10)
.setMetadata("metadata")
.setCommitTimestamp(1234L)
- .setExpireTimestamp(-1L);
+ .setExpireTimestamp(-1L)
+ .setTopicId(topicId);
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
key,
@@ -749,11 +761,14 @@ public class GroupCoordinatorRecordHelpersTest {
"foo",
1,
new OffsetAndMetadata(
+ -1L,
100L,
OptionalInt.of(10),
"metadata",
1234L,
- OptionalLong.empty())
+ OptionalLong.empty(),
+ topicId
+ )
));
value.setLeaderEpoch(-1);
@@ -767,7 +782,9 @@ public class GroupCoordinatorRecordHelpersTest {
OptionalInt.empty(),
"metadata",
1234L,
- OptionalLong.empty())
+ OptionalLong.empty(),
+ topicId
+ )
));
}
@@ -798,7 +815,9 @@ public class GroupCoordinatorRecordHelpersTest {
OptionalInt.of(10),
"metadata",
1234L,
- OptionalLong.of(5678L))
+ OptionalLong.of(5678L),
+ Uuid.ZERO_UUID
+ )
));
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
index e6be1f27883..2cc585b06af 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
@@ -16,27 +16,33 @@
*/
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.server.util.MockTime;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.OptionalInt;
import java.util.OptionalLong;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class OffsetAndMetadataTest {
@Test
public void testAttributes() {
+ Uuid topicId = Uuid.randomUuid();
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
100L,
OptionalInt.of(10),
"metadata",
1234L,
- OptionalLong.of(5678L)
+ OptionalLong.of(5678L),
+ topicId
);
assertEquals(100L, offsetAndMetadata.committedOffset);
@@ -44,16 +50,26 @@ public class OffsetAndMetadataTest {
assertEquals("metadata", offsetAndMetadata.metadata);
assertEquals(1234L, offsetAndMetadata.commitTimestampMs);
assertEquals(OptionalLong.of(5678L),
offsetAndMetadata.expireTimestampMs);
+ assertEquals(topicId, offsetAndMetadata.topicId);
}
- @Test
- public void testFromRecord() {
+ private static Stream<Uuid> uuids() {
+ return Stream.of(
+ Uuid.ZERO_UUID,
+ Uuid.randomUuid()
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("uuids")
+ public void testFromRecord(Uuid uuid) {
OffsetCommitValue record = new OffsetCommitValue()
.setOffset(100L)
.setLeaderEpoch(-1)
.setMetadata("metadata")
.setCommitTimestamp(1234L)
- .setExpireTimestamp(-1L);
+ .setExpireTimestamp(-1L)
+ .setTopicId(uuid);
assertEquals(new OffsetAndMetadata(
10L,
@@ -61,7 +77,8 @@ public class OffsetAndMetadataTest {
OptionalInt.empty(),
"metadata",
1234L,
- OptionalLong.empty()
+ OptionalLong.empty(),
+ uuid
), OffsetAndMetadata.fromRecord(10L, record));
record
@@ -74,12 +91,14 @@ public class OffsetAndMetadataTest {
OptionalInt.of(12),
"metadata",
1234L,
- OptionalLong.of(5678L)
+ OptionalLong.of(5678L),
+ uuid
), OffsetAndMetadata.fromRecord(11L, record));
}
- @Test
- public void testFromRequest() {
+ @ParameterizedTest
+ @MethodSource("uuids")
+ public void testFromRequest(Uuid uuid) {
MockTime time = new MockTime();
OffsetCommitRequestData.OffsetCommitRequestPartition partition =
@@ -95,8 +114,10 @@ public class OffsetAndMetadataTest {
OptionalInt.empty(),
"",
time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ uuid
), OffsetAndMetadata.fromRequest(
+ uuid,
partition,
time.milliseconds(),
OptionalLong.empty()
@@ -113,8 +134,10 @@ public class OffsetAndMetadataTest {
OptionalInt.of(10),
"hello",
time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ uuid
), OffsetAndMetadata.fromRequest(
+ uuid,
partition,
time.milliseconds(),
OptionalLong.empty()
@@ -127,8 +150,10 @@ public class OffsetAndMetadataTest {
OptionalInt.of(10),
"hello",
time.milliseconds(),
- OptionalLong.of(5678L)
+ OptionalLong.of(5678L),
+ uuid
), OffsetAndMetadata.fromRequest(
+ uuid,
partition,
time.milliseconds(),
OptionalLong.of(5678L)
@@ -153,7 +178,8 @@ public class OffsetAndMetadataTest {
OptionalInt.empty(),
"",
time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
), OffsetAndMetadata.fromRequest(
partition,
time.milliseconds()
@@ -170,7 +196,8 @@ public class OffsetAndMetadataTest {
OptionalInt.of(10),
"hello",
time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
), OffsetAndMetadata.fromRequest(
partition,
time.milliseconds()
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
index 2f1cb354a5a..63bbad94467 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.Uuid;
+
import org.junit.jupiter.api.Test;
import java.util.OptionalInt;
@@ -39,7 +41,8 @@ public class OffsetExpirationConditionImplTest {
OptionalInt.of(1),
"metadata",
commitTimestamp,
- expireTimestampMs
+ expireTimestampMs,
+ Uuid.ZERO_UUID
);
// Test when expire timestamp exists (older versions with per
partition retention)
@@ -56,7 +59,8 @@ public class OffsetExpirationConditionImplTest {
OptionalInt.of(1),
"metadata",
commitTimestamp,
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
);
// 3. Current timestamp - base timestamp >= offsets retention =>
should expire
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index e90927cfdd7..d3ddef15773 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -68,6 +68,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.net.InetAddress;
import java.time.Duration;
@@ -80,6 +81,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.stream.Stream;
import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
@@ -449,7 +451,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.of(leaderEpoch),
"metadata",
commitTimestamp,
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
)
));
}
@@ -580,6 +583,13 @@ public class OffsetMetadataManagerTest {
}
}
+ private static Stream<Uuid> uuids() {
+ return Stream.of(
+ Uuid.ZERO_UUID,
+ Uuid.randomUuid()
+ );
+ }
+
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
public void testOffsetCommitWithUnknownGroup(short version) {
@@ -906,7 +916,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"",
context.time.milliseconds(),
- OptionalLong.of(context.time.milliseconds() + 1234L)
+ OptionalLong.of(context.time.milliseconds() + 1234L),
+ Uuid.ZERO_UUID
)
)),
result.records()
@@ -971,8 +982,9 @@ public class OffsetMetadataManagerTest {
assertFalse(group.hasMember(member.memberId()));
}
- @Test
- public void testSimpleGroupOffsetCommit() {
+ @ParameterizedTest
+ @MethodSource("uuids")
+ public void testSimpleGroupOffsetCommit(Uuid topicId) {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result
= context.commitOffset(
@@ -981,6 +993,7 @@ public class OffsetMetadataManagerTest {
.setTopics(List.of(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
+ .setTopicId(topicId)
.setPartitions(List.of(
new
OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
@@ -994,6 +1007,7 @@ public class OffsetMetadataManagerTest {
.setTopics(List.of(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setName("bar")
+ .setTopicId(topicId)
.setPartitions(List.of(
new
OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
@@ -1013,7 +1027,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ topicId
)
)),
result.records()
@@ -1072,7 +1087,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
)
)),
result.records()
@@ -1235,15 +1251,17 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
)
)),
result.records()
);
}
- @Test
- public void testConsumerGroupOffsetCommit() {
+ @ParameterizedTest
+ @MethodSource("uuids")
+ public void testConsumerGroupOffsetCommit(Uuid topicId) {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
@@ -1267,74 +1285,7 @@ public class OffsetMetadataManagerTest {
.setTopics(List.of(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
- .setPartitions(List.of(
- new
OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(100L)
- .setCommittedLeaderEpoch(10)
- .setCommittedMetadata("metadata")
- ))
- ))
- );
-
- assertEquals(
- new OffsetCommitResponseData()
- .setTopics(List.of(
- new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setName("bar")
- .setPartitions(List.of(
- new
OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(0)
- .setErrorCode(Errors.NONE.code())
- ))
- )),
- result.response()
- );
-
- assertEquals(
- List.of(GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
- "foo",
- "bar",
- 0,
- new OffsetAndMetadata(
- 100L,
- OptionalInt.of(10),
- "metadata",
- context.time.milliseconds(),
- OptionalLong.empty()
- )
- )),
- result.records()
- );
- }
-
- @Test
- public void testConsumerGroupOffsetCommitWithTopicIds() {
- Uuid topicId = Uuid.randomUuid();
- OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
-
- // Create an empty group.
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
- "foo",
- true
- );
-
- // Add member.
- group.updateMember(new ConsumerGroupMember.Builder("member")
- .setMemberEpoch(10)
- .setPreviousMemberEpoch(10)
- .build()
- );
-
- CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result
= context.commitOffset(
- new OffsetCommitRequestData()
- .setGroupId("foo")
- .setMemberId("member")
- .setGenerationIdOrMemberEpoch(10)
- .setTopics(List.of(
- new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(topicId)
- .setName("bar")
.setPartitions(List.of(
new
OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
@@ -1349,8 +1300,8 @@ public class OffsetMetadataManagerTest {
new OffsetCommitResponseData()
.setTopics(List.of(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setTopicId(topicId)
.setName("bar")
+ .setTopicId(topicId)
.setPartitions(List.of(
new
OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
@@ -1370,7 +1321,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.of(10),
"metadata",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ topicId
)
)),
result.records()
@@ -1446,7 +1398,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.of(10),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
)
)),
result.records()
@@ -1512,7 +1465,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.of(10),
"metadata",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
)
)),
result.records()
@@ -1669,7 +1623,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.of(10),
"metadata",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
)
)),
result.records()
@@ -2766,8 +2721,9 @@ public class OffsetMetadataManagerTest {
.setMetadata("");
}
- @Test
- public void testReplay() {
+ @ParameterizedTest
+ @MethodSource("uuids")
+ public void testReplay(Uuid topicId) {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(
@@ -2776,7 +2732,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ topicId
));
verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(
@@ -2785,7 +2742,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.of(10),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ topicId
));
verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata(
@@ -2794,7 +2752,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.of(10),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ topicId
));
verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata(
@@ -2803,7 +2762,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.of(10),
"small",
context.time.milliseconds(),
- OptionalLong.of(12345L)
+ OptionalLong.of(12345L),
+ topicId
));
}
@@ -2817,7 +2777,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
verifyTransactionalReplay(context, 5, "foo", "bar", 1, new
OffsetAndMetadata(
@@ -2826,7 +2787,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
verifyTransactionalReplay(context, 5, "bar", "zar", 0, new
OffsetAndMetadata(
@@ -2835,7 +2797,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
verifyTransactionalReplay(context, 5, "bar", "zar", 1, new
OffsetAndMetadata(
@@ -2844,7 +2807,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
verifyTransactionalReplay(context, 6, "foo", "bar", 2, new
OffsetAndMetadata(
@@ -2853,7 +2817,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
verifyTransactionalReplay(context, 6, "foo", "bar", 3, new
OffsetAndMetadata(
@@ -2862,7 +2827,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
}
@@ -2877,7 +2843,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
verifyTransactionalReplay(context, 10L, "foo", "bar", 0, new
OffsetAndMetadata(
@@ -2886,7 +2853,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
verifyTransactionalReplay(context, 10L, "foo", "bar", 1, new
OffsetAndMetadata(
@@ -2895,7 +2863,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
// Delete the offsets.
@@ -2927,7 +2896,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
// Add pending transactional commit for producer id 5.
@@ -2937,7 +2907,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
// Add pending transactional commit for producer id 6.
@@ -2947,7 +2918,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
// Replaying an end marker with an unknown producer id should not fail.
@@ -2971,7 +2943,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
), context.offsetMetadataManager.offset(
"foo",
"bar",
@@ -3007,7 +2980,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
// Add regular offset commit.
@@ -3017,7 +2991,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
// Replaying an end marker to commit transaction of producer id 5.
@@ -3039,7 +3014,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
), context.offsetMetadataManager.offset(
"foo",
"bar",
@@ -3058,7 +3034,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
// Add pending transactional commit for producer id 5.
@@ -3068,7 +3045,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
// Add pending transactional commit for producer id 6.
@@ -3078,7 +3056,8 @@ public class OffsetMetadataManagerTest {
OptionalInt.empty(),
"small",
context.time.milliseconds(),
- OptionalLong.empty()
+ OptionalLong.empty(),
+ Uuid.ZERO_UUID
));
// Commit all the transactions.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index a8b4d639e1c..dfcb415fd3e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1124,7 +1124,7 @@ public class ClassicGroupTest {
long currentTimestamp = 30000L;
long commitTimestamp = 20000L;
long offsetsRetentionMs = 10000L;
- OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
MockTime time = new MockTime();
long currentStateTimestamp = time.milliseconds();
ClassicGroup group = new ClassicGroup(new LogContext(), "groupId",
EMPTY, time);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 333df21d9c3..a6b91e5a83b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -1198,7 +1198,7 @@ public class ConsumerGroupTest {
long currentTimestamp = 30000L;
long commitTimestamp = 20000L;
long offsetsRetentionMs = 10000L;
- OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new
LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class));
Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index e9038b33402..99e13bbf155 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -791,7 +791,7 @@ public class StreamsGroupTest {
long currentTimestamp = 30000L;
long commitTimestamp = 20000L;
long offsetsRetentionMs = 10000L;
- OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new
SnapshotRegistry(LOG_CONTEXT), "group-id",
mock(GroupCoordinatorMetricsShard.class));
Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
index 331b252dcc4..d774d0fed79 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.tools.consumer;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
@@ -28,7 +29,6 @@ import java.util.Collections;
import java.util.stream.Stream;
public class OffsetMessageFormatterTest extends
CoordinatorRecordMessageFormatterTest {
-
private static final OffsetCommitKey OFFSET_COMMIT_KEY = new
OffsetCommitKey()
.setGroup("group-id")
.setTopic("foo")
@@ -38,7 +38,8 @@ public class OffsetMessageFormatterTest extends
CoordinatorRecordMessageFormatte
.setLeaderEpoch(10)
.setMetadata("metadata")
.setCommitTimestamp(1234L)
- .setExpireTimestamp(5678L);
+ .setExpireTimestamp(5678L)
+ .setTopicId(Uuid.fromString("MKXx1fIkQy2J9jXHhK8m1w"));
private static final GroupMetadataKey GROUP_METADATA_KEY = new
GroupMetadataKey().setGroup("group-id");
private static final GroupMetadataValue GROUP_METADATA_VALUE = new
GroupMetadataValue()
.setProtocolType("consumer")
@@ -121,7 +122,8 @@ public class OffsetMessageFormatterTest extends
CoordinatorRecordMessageFormatte
"data":{"offset":100,
"leaderEpoch":10,
"metadata":"metadata",
- "commitTimestamp":1234}}}
+ "commitTimestamp":1234,
+ "topicId":"MKXx1fIkQy2J9jXHhK8m1w"}}}
"""
),
Arguments.of(