vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1370542261
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -1908,6 +1909,715 @@ public void testLeavingMemberBumpsGroupEpoch() { assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testGroupEpochBumpWhenNewStaticMemberJoins() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + // Use zar only here to ensure that metadata needs to be recomputed. + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + ))); + put(memberId3, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 3 joins the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setInstanceId(memberId3) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()), + result.response() + ); + + ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3) + .setMemberEpoch(11) + .setInstanceId(memberId3) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(11) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 1) + )), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 2) + )), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3) + ); + + assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6)); + assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8)); + } + + @Test + public void testStaticMemberGetsBackAssignmentUponRejoin() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String member2RejoinId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two static members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + })) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + // When the member rejoins, it gets the same assignments. + put(member2RejoinId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2) + ))); + } + } + )); + + // Member 2 leaves the consumer group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setInstanceId(memberId2) + .setMemberEpoch(-2) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + // member epoch of the response would be set to -2 + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(-2), + result.response() + ); + + ConsumerGroupMember member2UpdatedEpoch = new ConsumerGroupMember.Builder(member2) + .setMemberEpoch(-2) + .build(); + // The departing static member will have it's epoch set to -2. + List<Record> expectedRecords = Collections.singletonList( + RecordHelpers.newCurrentAssignmentRecord(groupId, member2UpdatedEpoch) + ); + + assertEquals(result.records(), expectedRecords); + // Replay the updated record with -2 epoch so that the group can update itself. + context.replay(RecordHelpers.newCurrentAssignmentRecord(groupId, member2UpdatedEpoch)); + + // member 2 rejoins the group with the same instance id + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> rejoinResult = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setMemberId(member2RejoinId) + .setGroupId(groupId) + .setInstanceId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setServerAssignor("range") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(member2RejoinId) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()), + rejoinResult.response() + ); + + ConsumerGroupMember expectedRejoinedMember = new ConsumerGroupMember.Builder(member2RejoinId) + .setMemberEpoch(10) + .setInstanceId(memberId2) + .setPreviousMemberEpoch(0) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setPartitionsPendingAssignment(mkAssignment( Review Comment: @dajac , I realised that even in the static member re-joining case, while the group epoch doesn't bump, the partitions would be in pending assignment state. I believe, eventually the member would get it's assignments. In that case, this state seems correct to me. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org