dajac commented on code in PR #15271: URL: https://github.com/apache/kafka/pull/15271#discussion_r1472793128
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -552,8 +555,63 @@ public void testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU // Pending assignment that was discovered in metadata should be ready to reconcile in the // next reconciliation loop. - Set<TopicIdPartition> topic2Assignment = topicIdPartitionsSet(topicId2, topic2, 1, 2); - assertEquals(topic2Assignment, membershipManager.assignmentReadyToReconcile()); + Map<Uuid, SortedSet<Integer>> topic2Assignment = topicIdPartitionsMap(topicId2, 1, 2); + assertEquals(topic2Assignment, membershipManager.topicPartitionsAwaitingReconciliation()); + } + + // Tests the case where topic metadata is not available at the time of the assignment, + // but is made available later. + @Test + public void testDelayedMetadataUsedToCompleteAssignment() { + + Uuid topicId1 = Uuid.randomUuid(); + String topic1 = "topic1"; + final TopicIdPartition topicId1Partition0 = new TopicIdPartition(topicId1, new TopicPartition(topic1, 0)); + + Uuid topicId2 = Uuid.randomUuid(); + String topic2 = "topic2"; + final TopicIdPartition topicId2Partition0 = new TopicIdPartition(topicId2, new TopicPartition(topic2, 0)); + + // Receive assignment with only topic1-0, entering STABLE state. + MembershipManagerImpl membershipManager = + mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, topic1, Collections.singletonList(0)); + + membershipManager.onHeartbeatRequestSent(); + + assertEquals(MemberState.STABLE, membershipManager.state()); + when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Collections.singleton(topicId1Partition0))); + clearInvocations(membershipManager, subscriptionState); + + // New assignment adding a new topic2-0 (not in metadata). + // No reconciliation triggered, because new topic in assignment is waiting for metadata. + + Map<Uuid, SortedSet<Integer>> newAssignment = + mkMap( + mkEntry(topicId1, mkSortedSet(0)), + mkEntry(topicId2, mkSortedSet(0)) + ); + + receiveAssignment(newAssignment, membershipManager); + + verifyReconciliationNotTriggered(membershipManager); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(Collections.singleton(topicId2), membershipManager.topicsAwaitingReconciliation()); + verify(metadata).requestUpdate(anyBoolean()); + clearInvocations(membershipManager); + clearInvocations(commitRequestManager); Review Comment: nit: You could combine into one line. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -552,8 +555,63 @@ public void testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU // Pending assignment that was discovered in metadata should be ready to reconcile in the // next reconciliation loop. - Set<TopicIdPartition> topic2Assignment = topicIdPartitionsSet(topicId2, topic2, 1, 2); - assertEquals(topic2Assignment, membershipManager.assignmentReadyToReconcile()); + Map<Uuid, SortedSet<Integer>> topic2Assignment = topicIdPartitionsMap(topicId2, 1, 2); + assertEquals(topic2Assignment, membershipManager.topicPartitionsAwaitingReconciliation()); + } + + // Tests the case where topic metadata is not available at the time of the assignment, + // but is made available later. + @Test + public void testDelayedMetadataUsedToCompleteAssignment() { + + Uuid topicId1 = Uuid.randomUuid(); + String topic1 = "topic1"; + final TopicIdPartition topicId1Partition0 = new TopicIdPartition(topicId1, new TopicPartition(topic1, 0)); + + Uuid topicId2 = Uuid.randomUuid(); + String topic2 = "topic2"; + final TopicIdPartition topicId2Partition0 = new TopicIdPartition(topicId2, new TopicPartition(topic2, 0)); + + // Receive assignment with only topic1-0, entering STABLE state. + MembershipManagerImpl membershipManager = + mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, topic1, Collections.singletonList(0)); + + membershipManager.onHeartbeatRequestSent(); + + assertEquals(MemberState.STABLE, membershipManager.state()); + when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Collections.singleton(topicId1Partition0))); + clearInvocations(membershipManager, subscriptionState); + + // New assignment adding a new topic2-0 (not in metadata). + // No reconciliation triggered, because new topic in assignment is waiting for metadata. + + Map<Uuid, SortedSet<Integer>> newAssignment = + mkMap( + mkEntry(topicId1, mkSortedSet(0)), + mkEntry(topicId2, mkSortedSet(0)) + ); + + receiveAssignment(newAssignment, membershipManager); + + verifyReconciliationNotTriggered(membershipManager); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(Collections.singleton(topicId2), membershipManager.topicsAwaitingReconciliation()); + verify(metadata).requestUpdate(anyBoolean()); + clearInvocations(membershipManager); + clearInvocations(commitRequestManager); + + // Metadata discovered for topic2. Should trigger reconciliation to complete the assignment, + // with membership manager entering ACKNOWLEDGING state. + + Map<Uuid, String> fullTopicMetadata = mkMap( + mkEntry(topicId1, topic1), + mkEntry(topicId2, topic2) + ); + when(metadata.topicNames()).thenReturn(fullTopicMetadata); + + membershipManager.onUpdate(null); + + verifyReconciliationTriggeredAndCompleted(membershipManager, Arrays.asList(topicId1Partition0, topicId2Partition0)); Review Comment: nit: There is an extra space before `Arrays`. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -552,8 +555,63 @@ public void testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU // Pending assignment that was discovered in metadata should be ready to reconcile in the // next reconciliation loop. - Set<TopicIdPartition> topic2Assignment = topicIdPartitionsSet(topicId2, topic2, 1, 2); - assertEquals(topic2Assignment, membershipManager.assignmentReadyToReconcile()); + Map<Uuid, SortedSet<Integer>> topic2Assignment = topicIdPartitionsMap(topicId2, 1, 2); + assertEquals(topic2Assignment, membershipManager.topicPartitionsAwaitingReconciliation()); + } + + // Tests the case where topic metadata is not available at the time of the assignment, + // but is made available later. + @Test + public void testDelayedMetadataUsedToCompleteAssignment() { Review Comment: I wonder if having the same test but starting with an empty assignment would bring any value. Have you considered it? For the context, I was wondering if the second reconciliation would be started if the metadata for the second topic is received while the first topic is reconciled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org