squah-confluent commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2921054377
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -624,6 +630,14 @@ public CoordinatorResult<OffsetCommitResponseData,
CoordinatorRecord> commitOffs
.setName(topic.name());
response.topics().add(topicResponse);
+ // Resolve topic ID if it's ZERO_UUID
+ final Uuid resolvedTopicId = topic.topicId().equals(Uuid.ZERO_UUID)
+ ? metadataImage
+ .topicMetadata(topic.name())
+ .map(CoordinatorMetadataImage.TopicMetadata::id)
+ .orElse(Uuid.ZERO_UUID)
Review Comment:
The formatting is off here.
```suggestion
.topicMetadata(topic.name())
.map(CoordinatorMetadataImage.TopicMetadata::id)
.orElse(Uuid.ZERO_UUID)
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -1273,6 +1287,15 @@ public void replayEndTransactionMarker(
}
}
+ /**
+ * Updates the metadata image.
+ *
+ * @param newImage The new metadata image.
+ */
+ public void onMetadataUpdate(CoordinatorMetadataImage newImage) {
Review Comment:
Could we copy `GroupMetadataManager`?
```suggestion
/**
* A new metadata image is available.
*
* @param delta The delta image.
* @param newImage The new metadata image.
*/
public void onMetadataUpdate(CoordinatorMetadataDelta delta,
CoordinatorMetadataImage newImage) {
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ for (boolean isTransactional : new boolean[] {true, false}) {
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When assignment epoch(7) <= client epoch(9) <= broker
epoch(10), no exception thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 9, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 9,
isTransactional, version));
+ }
+
+ // When client epoch (11) > broker epoch (10) , exception thrown
directly from validateOffsetCommit.
+ if (version >= 9 || isTransactional) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ assertEquals("Received member epoch 11 is newer than current
member epoch 10.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ }
+
+ // When client epoch (10) == broker epoch (10), no exception
thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (8) != broker epoch (10) and client epoch (8)
> assignment epoch (7),
+ // stale member epoch exception thrown from validateOffsetCommit.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 6, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("foo", topicId, 0));
+ assertEquals(
+ "Received member epoch 6 is older than assignment epoch 7
for partition foo-0.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ for (boolean isTransactional : new boolean[] {true, false}) {
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception
thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 10, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 10,
isTransactional, version));
+ }
+
+ // When client epoch (11) > broker epoch (10), exception thrown
directly from validateOffsetCommit.
+ if (version >= 9 || isTransactional) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ assertEquals(
+ "Received member epoch 11 is newer than current member
epoch 10.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ }
+
+ // When partition epoch (7) <= client epoch (7) <= broker epoch
(10), no exception thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (6) != broker epoch (10) and client epoch (6)
< partition epoch (7),
+ // exception thrown directly from validateOffsetCommit.
Review Comment:
This comment doesn't match the check. The exception comes from the validator.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ for (boolean isTransactional : new boolean[] {true, false}) {
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When assignment epoch(7) <= client epoch(9) <= broker
epoch(10), no exception thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 9, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 9,
isTransactional, version));
+ }
+
+ // When client epoch (11) > broker epoch (10) , exception thrown
directly from validateOffsetCommit.
+ if (version >= 9 || isTransactional) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ assertEquals("Received member epoch 11 is newer than current
member epoch 10.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ }
+
+ // When client epoch (10) == broker epoch (10), no exception
thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (8) != broker epoch (10) and client epoch (8)
> assignment epoch (7),
+ // stale member epoch exception thrown from validateOffsetCommit.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 6, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("foo", topicId, 0));
+ assertEquals(
+ "Received member epoch 6 is older than assignment epoch 7
for partition foo-0.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ for (boolean isTransactional : new boolean[] {true, false}) {
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception
thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 10, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 10,
isTransactional, version));
+ }
+
+ // When client epoch (11) > broker epoch (10), exception thrown
directly from validateOffsetCommit.
+ if (version >= 9 || isTransactional) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ assertEquals(
+ "Received member epoch 11 is newer than current member
epoch 10.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ }
+
+ // When partition epoch (7) <= client epoch (7) <= broker epoch
(10), no exception thrown.
Review Comment:
```suggestion
// When partition epoch (7) <= client epoch (7) <= broker epoch
(10), no exception thrown.
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1419,6 +1429,59 @@ private static void verifyOffsetCommit(Uuid topicId,
OffsetMetadataManagerTestCo
);
}
+ @Test
+ public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withMetadataImage(new
KRaftCoordinatorMetadataImage(metadataImage))
+ .build();
+
+ // Create an empty group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+ "foo",
+ true
+ );
+
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(barTopicId, 5, 0)))
+ .build()
+ );
+
+ OffsetCommitRequestData request = new OffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setTopics(List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName(barTopicName)
+ .setTopicId(Uuid.ZERO_UUID)
+ .setPartitions(List.of(
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ ))
+ ));
+
+ // When client epoch (3) < assignment epoch (5), exception should be
thrown.
+ request.setGenerationIdOrMemberEpoch(3);
+ assertThrows(StaleMemberEpochException.class, () ->
context.commitOffset(request));
+
+ // When client epoch (5) >= assignment epoch (5), commit should
succeed.
+ request.setGenerationIdOrMemberEpoch(5);
+ assertDoesNotThrow(() -> context.commitOffset(request));
+ CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> resp =
context.commitOffset(request);
+ assertEquals(1, resp.response().topics().size());
+ assertEquals(barTopicName, resp.response().topics().get(0).name());
Review Comment:
I think we should also check the error code for the partition. The rest of
the tests write the checks as:
```suggestion
assertEquals(
new OffsetCommitResponseData()
.setTopics(List.of(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setName(barTopicName)
.setTopicId(barTopicId)
.setPartitions(List.of(
new
OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())
))
)),
result.response()
);
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1419,6 +1429,59 @@ private static void verifyOffsetCommit(Uuid topicId,
OffsetMetadataManagerTestCo
);
}
+ @Test
+ public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withMetadataImage(new
KRaftCoordinatorMetadataImage(metadataImage))
+ .build();
+
+ // Create an empty group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+ "foo",
+ true
+ );
+
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(barTopicId, 5, 0)))
+ .build()
+ );
+
+ OffsetCommitRequestData request = new OffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setTopics(List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName(barTopicName)
+ .setTopicId(Uuid.ZERO_UUID)
+ .setPartitions(List.of(
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ ))
+ ));
+
+ // When client epoch (3) < assignment epoch (5), exception should be
thrown.
+ request.setGenerationIdOrMemberEpoch(3);
+ assertThrows(StaleMemberEpochException.class, () ->
context.commitOffset(request));
+
+ // When client epoch (5) >= assignment epoch (5), commit should
succeed.
+ request.setGenerationIdOrMemberEpoch(5);
+ assertDoesNotThrow(() -> context.commitOffset(request));
+ CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> resp =
context.commitOffset(request);
Review Comment:
nit: newline please!
also we usually call the return value of `commitOffset` `result`.
```suggestion
CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord>
result = context.commitOffset(request);
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -300,9 +303,7 @@ public void
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased()
// m3 should not be able to acquire foo-1 because the epoch is smaller
// than the current partition epoch (11).
- assertThrows(IllegalStateException.class, () -> {
- consumerGroup.updateMember(m3);
- });
+ assertThrows(IllegalStateException.class, () ->
consumerGroup.updateMember(m3));
Review Comment:
nit: Could we avoid these formatting changes? I'm in favor of some of them
but let's not mix them in with this PR.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ for (boolean isTransactional : new boolean[] {true, false}) {
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When assignment epoch(7) <= client epoch(9) <= broker
epoch(10), no exception thrown.
+ if (version >= 9 || isTransactional) {
Review Comment:
Could we swap the order of these checks?
`9` is an OffsetCommit version and we shouldn't run the comparison against a
TxnOffsetCommit version.
```suggestion
if (isTransactional || version >= 9) {
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ for (boolean isTransactional : new boolean[] {true, false}) {
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When assignment epoch(7) <= client epoch(9) <= broker
epoch(10), no exception thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 9, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 9,
isTransactional, version));
+ }
+
+ // When client epoch (11) > broker epoch (10) , exception thrown
directly from validateOffsetCommit.
+ if (version >= 9 || isTransactional) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ assertEquals("Received member epoch 11 is newer than current
member epoch 10.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ }
+
+ // When client epoch (10) == broker epoch (10), no exception
thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (8) != broker epoch (10) and client epoch (8)
> assignment epoch (7),
+ // stale member epoch exception thrown from validateOffsetCommit.
Review Comment:
This comment doesn't match the check. The client epoch is 6 and the
exception comes from the validator.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ for (boolean isTransactional : new boolean[] {true, false}) {
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When assignment epoch(7) <= client epoch(9) <= broker
epoch(10), no exception thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 9, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 9,
isTransactional, version));
+ }
+
+ // When client epoch (11) > broker epoch (10) , exception thrown
directly from validateOffsetCommit.
+ if (version >= 9 || isTransactional) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ assertEquals("Received member epoch 11 is newer than current
member epoch 10.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ }
Review Comment:
We could maybe drop this check since it's already covered by
`testValidateTransactionalOffsetCommit` and `testValidateOffsetCommit`.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
Review Comment:
It's not appropriate to test `TxnOffsetCommit` requests using versions from
`OffsetCommit`.
`TxnOffsetCommit` runs from version 0 to 5 and `OffsetCommit` runs from
version 0 to 10.
Do you think you could parameterize these with a `MethodSource` that returns
```
(isTransactional=false, version=ApiKeys.OFFSET_COMMIT.oldestVersion())
...
(isTransactional=false, version=ApiKeys.OFFSET_COMMIT.latestVersion(true))
(isTransactional=true, version=ApiKeys.TXN_OFFSET_COMMIT.oldestVersion())
...
(isTransactional=true, version=ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true))
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ for (boolean isTransactional : new boolean[] {true, false}) {
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When assignment epoch(7) <= client epoch(9) <= broker
epoch(10), no exception thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 9, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 9,
isTransactional, version));
+ }
+
+ // When client epoch (11) > broker epoch (10) , exception thrown
directly from validateOffsetCommit.
+ if (version >= 9 || isTransactional) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ assertEquals("Received member epoch 11 is newer than current
member epoch 10.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
+ }
+
+ // When client epoch (10) == broker epoch (10), no exception
thrown.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId,
0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (8) != broker epoch (10) and client epoch (8)
> assignment epoch (7),
+ // stale member epoch exception thrown from validateOffsetCommit.
+ if (version >= 9 || isTransactional) {
+ CommitPartitionValidator validator =
group.validateOffsetCommit(
+ "member-id", "", 6, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("foo", topicId, 0));
+ assertEquals(
+ "Received member epoch 6 is older than assignment epoch 7
for partition foo-0.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ for (boolean isTransactional : new boolean[] {true, false}) {
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception
thrown.
Review Comment:
Could we align the ordering of tests within
`testValidateOffsetCommitWithAssignmentEpochValidation` and
`testValidateOffsetCommitWithPartitionPendingRevocation`? I would suggest
client epoch 10, then 7, then 6 or the other way around.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]