squah-confluent commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2915934260
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -673,8 +673,24 @@ public CommitPartitionValidator validateOffsetCommit(
"by members using the modern group protocol");
}
- validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
- return CommitPartitionValidator.NO_OP;
+ // For members using the consumer protocol, the epoch must either
match the last epoch sent
+ // in a heartbeat or be greater than or equal to the partition's
assignment epoch.
+ if (member.useClassicProtocol()) {
+ validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+ return CommitPartitionValidator.NO_OP;
+ }
+
+ // For members using the consumer protocol
+ if (memberEpoch == member.memberEpoch()) {
+ return CommitPartitionValidator.NO_OP;
+ }
+ if (memberEpoch > member.memberEpoch()) {
Review Comment:
nit: newline
```suggestion
}
if (memberEpoch > member.memberEpoch()) {
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,23 +631,31 @@ public CoordinatorResult<OffsetCommitResponseData,
CoordinatorRecord> commitOffs
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
} else {
+ // Resolve topic ID if it's ZERO_UUID
+ Uuid resolvedTopicId = topic.topicId();
+ if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+ resolvedTopicId = groupMetadataManager.image()
+ .topicMetadata(topic.name())
+ .map(CoordinatorMetadataImage.TopicMetadata::id)
+ .orElse(Uuid.ZERO_UUID);
+ }
Review Comment:
We can lift the resolution up one loop level, so that it is done per topic
instead of per partition.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -673,8 +673,24 @@ public CommitPartitionValidator validateOffsetCommit(
"by members using the modern group protocol");
}
- validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
- return CommitPartitionValidator.NO_OP;
+ // For members using the consumer protocol, the epoch must either
match the last epoch sent
+ // in a heartbeat or be greater than or equal to the partition's
assignment epoch.
Review Comment:
This comment looks misplaced.
```suggestion
// For members using the classic protocol, the epoch must match the
last epoch sent
// in a heartbeat.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +853,41 @@ private void validateMemberEpoch(
}
}
+ /**
+ * Creates a validator that checks if the received member epoch is valid
for each partition's assignment epoch.
+ * A commit is rejected if the partition is not assigned to the member
+ * or if the received client-side epoch is older than the partition's
assignment epoch (KIP-1251).
+ *
+ * @param member The consumer group member.
+ * @param receivedMemberEpoch The member epoch from the offset commit
request.
+ * @return A validator that checks each partition's assignment epoch.
Review Comment:
Could we re-use the streams javadoc wording?
```suggestion
* @param member The member whose assignments are being
validated.
* @param receivedMemberEpoch The received member epoch.
* @return A validator for per-partition validation.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -673,8 +673,24 @@ public CommitPartitionValidator validateOffsetCommit(
"by members using the modern group protocol");
}
- validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
- return CommitPartitionValidator.NO_OP;
+ // For members using the consumer protocol, the epoch must either
match the last epoch sent
+ // in a heartbeat or be greater than or equal to the partition's
assignment epoch.
+ if (member.useClassicProtocol()) {
+ validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+ return CommitPartitionValidator.NO_OP;
+ }
+
+ // For members using the consumer protocol
Review Comment:
```suggestion
// For members using the consumer protocol, the epoch must either
match the last epoch sent
// in a heartbeat or be greater than or equal to the partition's
assignment epoch.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,23 +631,31 @@ public CoordinatorResult<OffsetCommitResponseData,
CoordinatorRecord> commitOffs
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
} else {
+ // Resolve topic ID if it's ZERO_UUID
+ Uuid resolvedTopicId = topic.topicId();
+ if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+ resolvedTopicId = groupMetadataManager.image()
+ .topicMetadata(topic.name())
+ .map(CoordinatorMetadataImage.TopicMetadata::id)
+ .orElse(Uuid.ZERO_UUID);
+ }
// Validate commit per-partition
validator.validate(
topic.name(),
- topic.topicId(),
+ resolvedTopicId,
partition.partitionIndex()
);
log.debug("[GroupId {}] Committing offsets {} for
partition {}-{}-{} from member {} with leader epoch {}.",
- request.groupId(), partition.committedOffset(),
topic.topicId(), topic.name(), partition.partitionIndex(),
+ request.groupId(), partition.committedOffset(),
resolvedTopicId, topic.name(), partition.partitionIndex(),
Review Comment:
We shouldn't bake the topic id when the offset commit specified a topic name
only. This impacts behavior when topics are recreated.
```suggestion
request.groupId(), partition.committedOffset(),
topic.topicId(), topic.name(), partition.partitionIndex(),
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -359,14 +360,12 @@ public void testAddPartitionEpochs() {
assertEquals(11, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
// Updating to a smaller epoch should fail.
- assertThrows(IllegalStateException.class, () -> {
- consumerGroup.addPartitionEpochs(
- toAssignmentWithEpochs(mkAssignment(
- mkTopicAssignment(fooTopicId, 1)
- ), 10),
- 10
- );
- });
+ assertThrows(IllegalStateException.class, () ->
consumerGroup.addPartitionEpochs(
+ toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)
+ ), 10),
+ 10
+ ));
Review Comment:
Could we avoid these formatting changes? (x4)
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,23 +631,31 @@ public CoordinatorResult<OffsetCommitResponseData,
CoordinatorRecord> commitOffs
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
} else {
+ // Resolve topic ID if it's ZERO_UUID
+ Uuid resolvedTopicId = topic.topicId();
+ if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+ resolvedTopicId = groupMetadataManager.image()
+ .topicMetadata(topic.name())
+ .map(CoordinatorMetadataImage.TopicMetadata::id)
+ .orElse(Uuid.ZERO_UUID);
+ }
// Validate commit per-partition
validator.validate(
topic.name(),
- topic.topicId(),
+ resolvedTopicId,
partition.partitionIndex()
);
log.debug("[GroupId {}] Committing offsets {} for
partition {}-{}-{} from member {} with leader epoch {}.",
- request.groupId(), partition.committedOffset(),
topic.topicId(), topic.name(), partition.partitionIndex(),
+ request.groupId(), partition.committedOffset(),
resolvedTopicId, topic.name(), partition.partitionIndex(),
request.memberId(), partition.committedLeaderEpoch());
topicResponse.partitions().add(new
OffsetCommitResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.NONE.code()));
final OffsetAndMetadata offsetAndMetadata =
OffsetAndMetadata.fromRequest(
- topic.topicId(),
+ resolvedTopicId,
Review Comment:
We shouldn't bake the topic id when the offset commit specified a topic name
only. This impacts behavior when topics are recreated.
```suggestion
topic.topicId(),
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1278,6 +1285,59 @@ public void
testStreamsGroupOffsetCommitFromAdminClient() {
verifyOffsetCommitFromAdminClient(context);
}
+ @Test
+ public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Create an empty group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+ "foo",
+ true
+ );
+
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(Map.of(barTopicId, Map.of(0, 5)))
+ .build()
+ );
+
+ OffsetCommitRequestData request = new OffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(3) // stale member epoch
+ .setTopics(List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName(barTopicName)
+ .setTopicId(Uuid.ZERO_UUID) // ZERO_UUID topic_id
+ .setPartitions(List.of(
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ ))
+ ));
+
+ // client epoch (3) < assignment epoch (5), fail
Review Comment:
style nit: Could we either drop these comments or write them as full
sentences?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
Review Comment:
Could we move these new `testValidateOffsetCommit...` tests just after
`testValidateOffsetCommit`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1278,6 +1285,59 @@ public void
testStreamsGroupOffsetCommitFromAdminClient() {
verifyOffsetCommitFromAdminClient(context);
}
+ @Test
+ public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Create an empty group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+ "foo",
+ true
+ );
+
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(Map.of(barTopicId, Map.of(0, 5)))
Review Comment:
Can we use the `mkAssignment`-style methods?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
Review Comment:
nit: I would inline and constant-fold these as much as possible, including
the `String.format`s, except for `isTransactional`.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch = broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
+ if (version >= 9) {
+ int clientEpoch = memberEpoch + 1;
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", clientEpoch,
isTransactional, version));
+ assertEquals(
+ String.format("Received member epoch %d is newer than "
+ + "current member epoch %d.", clientEpoch, memberEpoch),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ }
+
+ // assignment epoch (7) <= client epoch (7) <= broker epoch (10)
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+
+ // client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7)
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch - 1, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ assignmentEpoch - 1, assignmentEpoch, topicName,
partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch -
1, isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch (10) == broker epoch (10), no exception thrown
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
+ if (version >= 9) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ assertEquals(
+ String.format("Received member epoch %d is newer than "
+ + "current member epoch %d.", memberEpoch + 1,
memberEpoch),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ }
+
+ // partition epoch <= client epoch <= broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+
+ // client epoch != broker epoch and client epoch < partition epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch - 1, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ assignmentEpoch - 1, assignmentEpoch, topicName,
partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch -
1, isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithUnassignedPartition(short version)
{
+ Uuid assignedTopicId = Uuid.randomUuid();
+ Uuid unassignedTopicId = Uuid.randomUuid();
+ String unassignedTopicName = "bar";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch,
partitionId)))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId,
assignmentEpoch, partitionId + 1)))
+ .build());
+
+ // Commit an unassigned partition
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(unassignedTopicName, unassignedTopicId,
partitionId));
+ assertEquals(
+ String.format("Partition %s-%d is not assigned or pending
revocation for member.",
+ unassignedTopicName, partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithMultiplePartitionsAndEpochs(short
version) {
Review Comment:
I'm not sure this test is necessary?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch = broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
+ if (version >= 9) {
+ int clientEpoch = memberEpoch + 1;
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", clientEpoch,
isTransactional, version));
+ assertEquals(
+ String.format("Received member epoch %d is newer than "
+ + "current member epoch %d.", clientEpoch, memberEpoch),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ }
+
+ // assignment epoch (7) <= client epoch (7) <= broker epoch (10)
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+
+ // client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7)
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch - 1, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ assignmentEpoch - 1, assignmentEpoch, topicName,
partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch -
1, isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch (10) == broker epoch (10), no exception thrown
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
+ if (version >= 9) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ assertEquals(
+ String.format("Received member epoch %d is newer than "
+ + "current member epoch %d.", memberEpoch + 1,
memberEpoch),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ }
+
+ // partition epoch <= client epoch <= broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+
+ // client epoch != broker epoch and client epoch < partition epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch - 1, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ assignmentEpoch - 1, assignmentEpoch, topicName,
partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch -
1, isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithUnassignedPartition(short version)
{
+ Uuid assignedTopicId = Uuid.randomUuid();
+ Uuid unassignedTopicId = Uuid.randomUuid();
+ String unassignedTopicName = "bar";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch,
partitionId)))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId,
assignmentEpoch, partitionId + 1)))
+ .build());
+
+ // Commit an unassigned partition
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(unassignedTopicName, unassignedTopicId,
partitionId));
+ assertEquals(
+ String.format("Partition %s-%d is not assigned or pending
revocation for member.",
+ unassignedTopicName, partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithMultiplePartitionsAndEpochs(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int memberEpoch = 10;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 3, 0),
+ mkTopicAssignmentWithEpochs(topicId, 5, 1),
+ mkTopicAssignmentWithEpochs(topicId, 8, 2)))
+ .build());
+
+ // with clientEpoch=6: partitions 0,1 should pass, partition 2 should
fail
+ if (version >= 9) {
+ int clientEpoch = 6;
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", clientEpoch, isTransactional, version
+ );
+
+ // For partition 0 and 1, assignment epoch (3 or 5) < client
epoch (6) < broker epoch (10), accept
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
0));
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
1));
+
+ // For partition 2, assignment epoch (8) > client epoch (6), reject
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, 2));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ clientEpoch, 8, topicName, 2),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void
testValidateOffsetCommitPrioritizeAssignedOverPendingRevocation(short version) {
Review Comment:
I'm not sure this test is necessary? It's not clear to me what it's testing.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1278,6 +1285,59 @@ public void
testStreamsGroupOffsetCommitFromAdminClient() {
verifyOffsetCommitFromAdminClient(context);
}
+ @Test
+ public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Create an empty group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+ "foo",
+ true
+ );
+
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(Map.of(barTopicId, Map.of(0, 5)))
+ .build()
+ );
+
+ OffsetCommitRequestData request = new OffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(3) // stale member epoch
+ .setTopics(List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName(barTopicName)
+ .setTopicId(Uuid.ZERO_UUID) // ZERO_UUID topic_id
+ .setPartitions(List.of(
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ ))
+ ));
+
+ // client epoch (3) < assignment epoch (5), fail
+ assertThrows(StaleMemberEpochException.class, () ->
context.commitOffset(request));
Review Comment:
We can move the `setGenerationIdOrMemberEpoch` setup closer.
```suggestion
request.setGenerationIdOrMemberEpoch(3);
assertThrows(StaleMemberEpochException.class, () ->
context.commitOffset(request));
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
Review Comment:
I don't think we need to set up the subscribed topic names?
```suggestion
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch = broker epoch
Review Comment:
style nit: Could we write these as short sentences?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch = broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
+ if (version >= 9) {
+ int clientEpoch = memberEpoch + 1;
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", clientEpoch,
isTransactional, version));
+ assertEquals(
+ String.format("Received member epoch %d is newer than "
+ + "current member epoch %d.", clientEpoch, memberEpoch),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ }
+
+ // assignment epoch (7) <= client epoch (7) <= broker epoch (10)
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+
+ // client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7)
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch - 1, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ assignmentEpoch - 1, assignmentEpoch, topicName,
partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch -
1, isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch (10) == broker epoch (10), no exception thrown
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
+ if (version >= 9) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ assertEquals(
+ String.format("Received member epoch %d is newer than "
+ + "current member epoch %d.", memberEpoch + 1,
memberEpoch),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ }
+
+ // partition epoch <= client epoch <= broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+
+ // client epoch != broker epoch and client epoch < partition epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch - 1, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ assignmentEpoch - 1, assignmentEpoch, topicName,
partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch -
1, isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithUnassignedPartition(short version)
{
+ Uuid assignedTopicId = Uuid.randomUuid();
+ Uuid unassignedTopicId = Uuid.randomUuid();
+ String unassignedTopicName = "bar";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch,
partitionId)))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId,
assignmentEpoch, partitionId + 1)))
+ .build());
+
+ // Commit an unassigned partition
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(unassignedTopicName, unassignedTopicId,
partitionId));
Review Comment:
Can we test both an unassigned topic and unassigned partition?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch = broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
+ if (version >= 9) {
+ int clientEpoch = memberEpoch + 1;
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", clientEpoch,
isTransactional, version));
+ assertEquals(
+ String.format("Received member epoch %d is newer than "
+ + "current member epoch %d.", clientEpoch, memberEpoch),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ }
+
+ // assignment epoch (7) <= client epoch (7) <= broker epoch (10)
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+
+ // client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7)
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch - 1, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ assignmentEpoch - 1, assignmentEpoch, topicName,
partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch -
1, isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch (10) == broker epoch (10), no exception thrown
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
+ if (version >= 9) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ assertEquals(
+ String.format("Received member epoch %d is newer than "
+ + "current member epoch %d.", memberEpoch + 1,
memberEpoch),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ }
+
+ // partition epoch <= client epoch <= broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+
+ // client epoch != broker epoch and client epoch < partition epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch - 1, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ assignmentEpoch - 1, assignmentEpoch, topicName,
partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch -
1, isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithUnassignedPartition(short version)
{
+ Uuid assignedTopicId = Uuid.randomUuid();
+ Uuid unassignedTopicId = Uuid.randomUuid();
+ String unassignedTopicName = "bar";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch,
partitionId)))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId,
assignmentEpoch, partitionId + 1)))
Review Comment:
```suggestion
mkTopicAssignmentWithEpochs(assignedTopicId,
assignmentEpoch, partitionId + 1)))
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch = broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
Review Comment:
missing newline
```suggestion
// client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1278,6 +1285,59 @@ public void
testStreamsGroupOffsetCommitFromAdminClient() {
verifyOffsetCommitFromAdminClient(context);
}
+ @Test
+ public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
Review Comment:
Could we move this method down one?
`verifyOffsetCommitFromAdminClient` is supposed to be grouped together with
`testConsumerGroupOffsetCommitFromAdminClient` and
`testStreamsGroupOffsetCommitFromAdminClient`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,23 +631,31 @@ public CoordinatorResult<OffsetCommitResponseData,
CoordinatorRecord> commitOffs
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
} else {
+ // Resolve topic ID if it's ZERO_UUID
+ Uuid resolvedTopicId = topic.topicId();
+ if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+ resolvedTopicId = groupMetadataManager.image()
Review Comment:
We shouldn't reach inside groupMetadataManager if we want the metadata image.
`OffsetMetadataManager` already takes a metadata image as a constructor
parameter. We should store it ourselves and update the type to follow the same
pattern as `GroupMetadataManager`. We will have to implement an
`onMetadataUpdate` method too.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
Review Comment:
Since the main driver for the KIP was transactional offset commits, it's
best that we test both transactional and non-transactional offset commits.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
cache
);
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch = broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
+ if (version >= 9) {
+ int clientEpoch = memberEpoch + 1;
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", clientEpoch,
isTransactional, version));
+ assertEquals(
+ String.format("Received member epoch %d is newer than "
+ + "current member epoch %d.", clientEpoch, memberEpoch),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ }
+
+ // assignment epoch (7) <= client epoch (7) <= broker epoch (10)
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+
+ // client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7)
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch - 1, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ assignmentEpoch - 1, assignmentEpoch, topicName,
partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch -
1, isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)))
+ .build());
+
+ // client epoch (10) == broker epoch (10), no exception thrown
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", memberEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch,
isTransactional, version));
+ }
+
+ // client epoch (11) > broker epoch (10) - exception thrown directly
from validateOffsetCommit
+ if (version >= 9) {
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ assertEquals(
+ String.format("Received member epoch %d is newer than "
+ + "current member epoch %d.", memberEpoch + 1,
memberEpoch),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", memberEpoch + 1,
isTransactional, version));
+ }
+
+ // partition epoch <= client epoch <= broker epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+
+ // client epoch != broker epoch and client epoch < partition epoch
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch - 1, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ assignmentEpoch - 1, assignmentEpoch, topicName,
partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch -
1, isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithUnassignedPartition(short version)
{
+ Uuid assignedTopicId = Uuid.randomUuid();
+ Uuid unassignedTopicId = Uuid.randomUuid();
+ String unassignedTopicName = "bar";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 7;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch,
partitionId)))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId,
assignmentEpoch, partitionId + 1)))
+ .build());
+
+ // Commit an unassigned partition
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", assignmentEpoch, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(unassignedTopicName, unassignedTopicId,
partitionId));
+ assertEquals(
+ String.format("Partition %s-%d is not assigned or pending
revocation for member.",
+ unassignedTopicName, partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", assignmentEpoch,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommitWithMultiplePartitionsAndEpochs(short
version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int memberEpoch = 10;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 3, 0),
+ mkTopicAssignmentWithEpochs(topicId, 5, 1),
+ mkTopicAssignmentWithEpochs(topicId, 8, 2)))
+ .build());
+
+ // with clientEpoch=6: partitions 0,1 should pass, partition 2 should
fail
+ if (version >= 9) {
+ int clientEpoch = 6;
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", clientEpoch, isTransactional, version
+ );
+
+ // For partition 0 and 1, assignment epoch (3 or 5) < client
epoch (6) < broker epoch (10), accept
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
0));
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
1));
+
+ // For partition 2, assignment epoch (8) > client epoch (6), reject
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, 2));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ clientEpoch, 8, topicName, 2),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void
testValidateOffsetCommitPrioritizeAssignedOverPendingRevocation(short version) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+ int partitionId = 0;
+ int memberEpoch = 10;
+ int assignmentEpoch = 5;
+ boolean isTransactional = false;
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(memberEpoch)
+ .setSubscribedTopicNames(List.of(topicName))
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, assignmentEpoch,
partitionId)
+ ))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 4, 2),
+ mkTopicAssignmentWithEpochs(topicId, 7, 1)))
+ .build());
+
+ // clientEpoch (4) < assignedPartitionEpoch (5), reject
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 4, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate(topicName, topicId, partitionId));
+ assertEquals(
+ String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
+ 4, assignmentEpoch, topicName, partitionId),
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 4,
isTransactional, version));
+ }
+
+ // broker epoch (10) >= client epoch (6) >= assigned partition epoch
(5), accept
+ if (version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 6, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate(topicName, topicId,
partitionId));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void
testValidateOffsetCommitWithMultiplePartitionsPendingRevocation(short version) {
Review Comment:
I'm not sure this test is necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]