squah-confluent commented on code in PR #21305:
URL: https://github.com/apache/kafka/pull/21305#discussion_r2691266302
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -479,6 +479,241 @@ public void testConsumerGroupMemberEpochValidation() {
assertEquals(100, result.response().memberEpoch());
}
+ @Test
+ public void testMemberCanRejoinWithEpochZeroInStableState() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in STABLE state with epoch 100.
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
100, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Prepare assignment for rejoin.
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )))
+ ));
Review Comment:
I don't think this is used in any of the tests since the group epoch is not
bumped?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]