squah-confluent commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2922328822
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -1325,7 +1499,7 @@ public void testFromClassicGroup() {
);
ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
- new LogContext(),
+ new LogContext(),
Review Comment:
```suggestion
new LogContext(),
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -359,14 +365,13 @@ public void testAddPartitionEpochs() {
assertEquals(11, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
// Updating to a smaller epoch should fail.
- assertThrows(IllegalStateException.class, () -> {
+ assertThrows(IllegalStateException.class, () ->
Review Comment:
Could we avoid these formatting changes? I'm in favor of some of them but
let's not mix them in with this PR.
```suggestion
assertThrows(IllegalStateException.class, () -> {
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +939,175 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
Review Comment:
```suggestion
"member-id", "", 10, isTransactional, version
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +939,175 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When assignment epoch(7) <= client epoch(7) <= broker epoch(10), no
exception thrown.
Review Comment:
`testValidateOffsetCommitWithAssignmentEpochValidation` and
`testValidateOffsetCommitWithPartitionPendingRevocation` should be identical
apart from the initial setup.
```suggestion
// When assignment epoch (7) <= client epoch (7) <= broker epoch
(10), no exception thrown.
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +939,175 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When assignment epoch(7) <= client epoch(7) <= broker epoch(10), no
exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7),
+ // stale member epoch exception thrown from assignment epoch validator.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 6, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("foo", topicId, 0));
+ assertEquals(
+ "Received member epoch 6 is older than assignment epoch 7 for
partition foo-0.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 10, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 10,
isTransactional, version));
+ }
+
+ // When partition epoch (7) <= client epoch (7) <= broker epoch (10),
no exception thrown.
Review Comment:
same here
```suggestion
// When assignment epoch (7) <= client epoch (7) <= broker epoch
(10), no exception thrown.
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -952,7 +1126,7 @@ public void testAsListedGroup() {
public void testValidateOffsetFetch() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConsumerGroup group = new ConsumerGroup(
- new LogContext(),
+ new LogContext(),
Review Comment:
Could we avoid these formatting changes? I'm in favor of some of them but
let's not mix them in with this PR.
```suggestion
new LogContext(),
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -359,14 +365,13 @@ public void testAddPartitionEpochs() {
assertEquals(11, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
// Updating to a smaller epoch should fail.
- assertThrows(IllegalStateException.class, () -> {
+ assertThrows(IllegalStateException.class, () ->
consumerGroup.addPartitionEpochs(
toAssignmentWithEpochs(mkAssignment(
mkTopicAssignment(fooTopicId, 1)
), 10),
10
- );
- });
+ ));
Review Comment:
```suggestion
);
});
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +939,175 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When assignment epoch(7) <= client epoch(7) <= broker epoch(10), no
exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7),
+ // stale member epoch exception thrown from assignment epoch validator.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 6, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("foo", topicId, 0));
+ assertEquals(
+ "Received member epoch 6 is older than assignment epoch 7 for
partition foo-0.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setSubscribedTopicNames(List.of("foo"))
Review Comment:
same here
```suggestion
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +939,175 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
Review Comment:
```suggestion
group.validateOffsetCommit("member-id", "", 10,
isTransactional, version));
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +939,175 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When assignment epoch(7) <= client epoch(7) <= broker epoch(10), no
exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7),
+ // stale member epoch exception thrown from assignment epoch validator.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 6, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("foo", topicId, 0));
+ assertEquals(
+ "Received member epoch 6 is older than assignment epoch 7 for
partition foo-0.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithPartitionPendingRevocation(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 10, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 10,
isTransactional, version));
+ }
+
+ // When partition epoch (7) <= client epoch (7) <= broker epoch (10),
no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (6) != broker epoch (10) and client epoch (6) <
partition epoch (7),
Review Comment:
same here
```suggestion
// When client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7),
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]