dajac commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1472793128


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -552,8 +555,63 @@ public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU
 
         // Pending assignment that was discovered in metadata should be ready 
to reconcile in the
         // next reconciliation loop.
-        Set<TopicIdPartition> topic2Assignment = 
topicIdPartitionsSet(topicId2, topic2, 1, 2);
-        assertEquals(topic2Assignment, 
membershipManager.assignmentReadyToReconcile());
+        Map<Uuid, SortedSet<Integer>> topic2Assignment = 
topicIdPartitionsMap(topicId2,  1, 2);
+        assertEquals(topic2Assignment, 
membershipManager.topicPartitionsAwaitingReconciliation());
+    }
+
+    // Tests the case where topic metadata is not available at the time of the 
assignment,
+    // but is made available later.
+    @Test
+    public void testDelayedMetadataUsedToCompleteAssignment() {
+
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        final TopicIdPartition topicId1Partition0 = new 
TopicIdPartition(topicId1, new TopicPartition(topic1, 0));
+
+        Uuid topicId2 = Uuid.randomUuid();
+        String topic2 = "topic2";
+        final TopicIdPartition topicId2Partition0 = new 
TopicIdPartition(topicId2, new TopicPartition(topic2, 0));
+
+        // Receive assignment with only topic1-0, entering STABLE state.
+        MembershipManagerImpl membershipManager =
+            mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, topic1, 
Collections.singletonList(0));
+
+        membershipManager.onHeartbeatRequestSent();
+
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        
when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Collections.singleton(topicId1Partition0)));
+        clearInvocations(membershipManager, subscriptionState);
+
+        // New assignment adding a new topic2-0 (not in metadata).
+        // No reconciliation triggered, because new topic in assignment is 
waiting for metadata.
+
+        Map<Uuid, SortedSet<Integer>> newAssignment =
+            mkMap(
+                mkEntry(topicId1, mkSortedSet(0)),
+                mkEntry(topicId2, mkSortedSet(0))
+            );
+
+        receiveAssignment(newAssignment, membershipManager);
+
+        verifyReconciliationNotTriggered(membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(Collections.singleton(topicId2), 
membershipManager.topicsAwaitingReconciliation());
+        verify(metadata).requestUpdate(anyBoolean());
+        clearInvocations(membershipManager);
+        clearInvocations(commitRequestManager);

Review Comment:
   nit: You could combine into one line.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -552,8 +555,63 @@ public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU
 
         // Pending assignment that was discovered in metadata should be ready 
to reconcile in the
         // next reconciliation loop.
-        Set<TopicIdPartition> topic2Assignment = 
topicIdPartitionsSet(topicId2, topic2, 1, 2);
-        assertEquals(topic2Assignment, 
membershipManager.assignmentReadyToReconcile());
+        Map<Uuid, SortedSet<Integer>> topic2Assignment = 
topicIdPartitionsMap(topicId2,  1, 2);
+        assertEquals(topic2Assignment, 
membershipManager.topicPartitionsAwaitingReconciliation());
+    }
+
+    // Tests the case where topic metadata is not available at the time of the 
assignment,
+    // but is made available later.
+    @Test
+    public void testDelayedMetadataUsedToCompleteAssignment() {
+
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        final TopicIdPartition topicId1Partition0 = new 
TopicIdPartition(topicId1, new TopicPartition(topic1, 0));
+
+        Uuid topicId2 = Uuid.randomUuid();
+        String topic2 = "topic2";
+        final TopicIdPartition topicId2Partition0 = new 
TopicIdPartition(topicId2, new TopicPartition(topic2, 0));
+
+        // Receive assignment with only topic1-0, entering STABLE state.
+        MembershipManagerImpl membershipManager =
+            mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, topic1, 
Collections.singletonList(0));
+
+        membershipManager.onHeartbeatRequestSent();
+
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        
when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Collections.singleton(topicId1Partition0)));
+        clearInvocations(membershipManager, subscriptionState);
+
+        // New assignment adding a new topic2-0 (not in metadata).
+        // No reconciliation triggered, because new topic in assignment is 
waiting for metadata.
+
+        Map<Uuid, SortedSet<Integer>> newAssignment =
+            mkMap(
+                mkEntry(topicId1, mkSortedSet(0)),
+                mkEntry(topicId2, mkSortedSet(0))
+            );
+
+        receiveAssignment(newAssignment, membershipManager);
+
+        verifyReconciliationNotTriggered(membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(Collections.singleton(topicId2), 
membershipManager.topicsAwaitingReconciliation());
+        verify(metadata).requestUpdate(anyBoolean());
+        clearInvocations(membershipManager);
+        clearInvocations(commitRequestManager);
+
+        // Metadata discovered for topic2. Should trigger reconciliation to 
complete the assignment,
+        // with membership manager entering ACKNOWLEDGING state.
+
+        Map<Uuid, String> fullTopicMetadata = mkMap(
+            mkEntry(topicId1, topic1),
+            mkEntry(topicId2, topic2)
+        );
+        when(metadata.topicNames()).thenReturn(fullTopicMetadata);
+
+        membershipManager.onUpdate(null);
+
+        verifyReconciliationTriggeredAndCompleted(membershipManager,  
Arrays.asList(topicId1Partition0, topicId2Partition0));

Review Comment:
   nit: There is an extra space before `Arrays`.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -552,8 +555,63 @@ public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU
 
         // Pending assignment that was discovered in metadata should be ready 
to reconcile in the
         // next reconciliation loop.
-        Set<TopicIdPartition> topic2Assignment = 
topicIdPartitionsSet(topicId2, topic2, 1, 2);
-        assertEquals(topic2Assignment, 
membershipManager.assignmentReadyToReconcile());
+        Map<Uuid, SortedSet<Integer>> topic2Assignment = 
topicIdPartitionsMap(topicId2,  1, 2);
+        assertEquals(topic2Assignment, 
membershipManager.topicPartitionsAwaitingReconciliation());
+    }
+
+    // Tests the case where topic metadata is not available at the time of the 
assignment,
+    // but is made available later.
+    @Test
+    public void testDelayedMetadataUsedToCompleteAssignment() {

Review Comment:
   I wonder if having the same test but starting with an empty assignment would 
bring any value. Have you considered it? For the context, I was wondering if 
the second reconciliation would be started if the metadata for the second topic 
is received while the first topic is reconciled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to