dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248087394
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() { .setTopicPartitions(Collections.emptyList()))); } + @Test + public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + // Create a context with one consumer group containing one member. + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + // foo only has 3 partitions stored in the metadata but foo has + // 6 partitions the metadata image. + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); + } + })) + .build(); + + // The metadata refresh flag should be true. + ConsumerGroup consumerGroup = context.groupMetadataManager + .getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + + // Prepare the assignment result. + assignor.prepareGroupAssignment(new GroupAssignment( + Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + // Heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + // The member gets partitions 3, 4 and 5 assigned. + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setAssignedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)) + ))), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setTargetMemberEpoch(11) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + + // Check next refresh time. + assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch); + } + + @Test + public void testSubscriptionMetadataRefreshedAgainAfterWriteFailure() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + // Create a context with one consumer group containing one member. + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + // foo only has 3 partitions stored in the metadata but foo has + // 6 partitions the metadata image. + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); + } + })) + .build(); + + // The metadata refresh flag should be true. + ConsumerGroup consumerGroup = context.groupMetadataManager + .getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + + // Prepare the assignment result. + assignor.prepareGroupAssignment(new GroupAssignment( + Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + // Heartbeat. + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + // The metadata refresh flag is set to a future time. + assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch); + + // Rollback the uncommitted changes. This does not rollback the metadata flag + // because it is not using a timeline data structure. + context.rollback(); + + // However, the next heartbeat should detect the divergence based on the epoch and trigger + // a metadata refresh. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + + // The member gets partitions 3, 4 and 5 assigned. + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setAssignedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)) + ))), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setTargetMemberEpoch(11) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List<Record> expectedRecords = Arrays.asList( + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + + // Check next refresh time. + assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch); + } + + @Test + public void testGroupIdsByTopics() { + String groupId1 = "group1"; + String groupId2 = "group2"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M1 in group 1 subscribes to foo and bar. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1, + new ConsumerGroupMember.Builder("group1-m1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .build())); + + assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M1 in group 2 subscribes to foo, bar and zar. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2, + new ConsumerGroupMember.Builder("group2-m1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .build())); + + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M2 in group 1 subscribes to bar and zar. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1, + new ConsumerGroupMember.Builder("group1-m2") + .setSubscribedTopicNames(Arrays.asList("bar", "zar")) + .build())); + + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M2 in group 2 subscribes to foo and bar. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2, + new ConsumerGroupMember.Builder("group2-m2") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .build())); + + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M1 in group 1 is removed. + context.replay(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId1, "group1-m1")); + context.replay(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId1, "group1-m1")); + + assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M1 in group 2 subscribes to nothing. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2, + new ConsumerGroupMember.Builder("group2-m1") + .setSubscribedTopicNames(Collections.emptyList()) + .build())); + + assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M2 in group 2 subscribes to foo. Review Comment: right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org