jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086692


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1932,6 +1961,435 @@ public void 
testPartitionAssignorExceptionOnRegularHeartbeat() {
                     .setTopicPartitions(Collections.emptyList())));
     }
 
+    @Test
+    public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        // foo only has 3 partitions stored in the metadata 
but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10));
+
+        // The member gets partitions 3, 4 and 5 assigned.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+
+        // Check next refresh time.
+        
assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+        assertEquals(context.time.milliseconds() + 5 * 60 * 1000, 
consumerGroup.metadataRefreshDeadline().deadlineMs);
+        assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+    }
+
+    @Test
+    public void testSubscriptionMetadataRefreshedAgainAfterWriteFailure() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        // foo only has 3 partitions stored in the metadata 
but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10));
+
+        // The metadata refresh flag is set to a future time.
+        
assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+        assertEquals(context.time.milliseconds() + 5 * 60 * 1000, 
consumerGroup.metadataRefreshDeadline().deadlineMs);
+        assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+
+        // Rollback the uncommitted changes. This does not rollback the 
metadata flag
+        // because it is not using a timeline data structure.
+        context.rollback();
+
+        // However, the next heartbeat should detect the divergence based on 
the epoch and trigger
+        // a metadata refresh.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10));
+
+
+        // The member gets partitions 3, 4 and 5 assigned.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+
+        // Check next refresh time.
+        
assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+        assertEquals(context.time.milliseconds() + 5 * 60 * 1000, 
consumerGroup.metadataRefreshDeadline().deadlineMs);
+        assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+    }
+
+    @Test
+    public void testGroupIdsByTopics() {
+        String groupId1 = "group1";
+        String groupId2 = "group2";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+
+        assertEquals(Collections.emptySet(), 
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(Collections.emptySet(), 
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(Collections.emptySet(), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 1 subscribes to foo and bar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+            new ConsumerGroupMember.Builder("group1-m1")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1), 
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1), 
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(Collections.emptySet(), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 2 subscribes to foo, bar and zar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+            new ConsumerGroupMember.Builder("group2-m1")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M2 in group 1 subscribes to bar and zar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+            new ConsumerGroupMember.Builder("group1-m2")
+                .setSubscribedTopicNames(Arrays.asList("bar", "zar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M2 in group 2 subscribes to foo and bar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+            new ConsumerGroupMember.Builder("group2-m2")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 1 is removed.
+        
context.replay(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId1, 
"group1-m1"));
+        
context.replay(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId1, 
"group1-m1"));
+
+        assertEquals(mkSet(groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 2 subscribes to nothing.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+            new ConsumerGroupMember.Builder("group2-m1")
+                .setSubscribedTopicNames(Collections.emptyList())
+                .build()));
+
+        assertEquals(mkSet(groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), 
context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1), 
context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M2 in group 2 subscribes to foo.

Review Comment:
   this is effectively just removing bar from the subscription right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to