lucasbru commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1472843719


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -552,8 +555,63 @@ public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU
 
         // Pending assignment that was discovered in metadata should be ready 
to reconcile in the
         // next reconciliation loop.
-        Set<TopicIdPartition> topic2Assignment = 
topicIdPartitionsSet(topicId2, topic2, 1, 2);
-        assertEquals(topic2Assignment, 
membershipManager.assignmentReadyToReconcile());
+        Map<Uuid, SortedSet<Integer>> topic2Assignment = 
topicIdPartitionsMap(topicId2,  1, 2);
+        assertEquals(topic2Assignment, 
membershipManager.topicPartitionsAwaitingReconciliation());
+    }
+
+    // Tests the case where topic metadata is not available at the time of the 
assignment,
+    // but is made available later.
+    @Test
+    public void testDelayedMetadataUsedToCompleteAssignment() {

Review Comment:
   If I understand your ask correctly, I think the test wouldn't pass yet. If 
the metadata arrives while the first topic is being reconciled, we'll not 
retrigger reconciliation, unless we get another assignment via the heartbeat. I 
will fix this and implement the test you are suggesting in 
[KAFKA-15832](https://issues.apache.org/jira/browse/KAFKA-15832)), where we'll 
check for the need to reconcile regularly during poll.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -552,8 +555,63 @@ public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU
 
         // Pending assignment that was discovered in metadata should be ready 
to reconcile in the
         // next reconciliation loop.
-        Set<TopicIdPartition> topic2Assignment = 
topicIdPartitionsSet(topicId2, topic2, 1, 2);
-        assertEquals(topic2Assignment, 
membershipManager.assignmentReadyToReconcile());
+        Map<Uuid, SortedSet<Integer>> topic2Assignment = 
topicIdPartitionsMap(topicId2,  1, 2);
+        assertEquals(topic2Assignment, 
membershipManager.topicPartitionsAwaitingReconciliation());
+    }
+
+    // Tests the case where topic metadata is not available at the time of the 
assignment,
+    // but is made available later.
+    @Test
+    public void testDelayedMetadataUsedToCompleteAssignment() {
+
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        final TopicIdPartition topicId1Partition0 = new 
TopicIdPartition(topicId1, new TopicPartition(topic1, 0));
+
+        Uuid topicId2 = Uuid.randomUuid();
+        String topic2 = "topic2";
+        final TopicIdPartition topicId2Partition0 = new 
TopicIdPartition(topicId2, new TopicPartition(topic2, 0));
+
+        // Receive assignment with only topic1-0, entering STABLE state.
+        MembershipManagerImpl membershipManager =
+            mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, topic1, 
Collections.singletonList(0));
+
+        membershipManager.onHeartbeatRequestSent();
+
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        
when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Collections.singleton(topicId1Partition0)));
+        clearInvocations(membershipManager, subscriptionState);
+
+        // New assignment adding a new topic2-0 (not in metadata).
+        // No reconciliation triggered, because new topic in assignment is 
waiting for metadata.
+
+        Map<Uuid, SortedSet<Integer>> newAssignment =
+            mkMap(
+                mkEntry(topicId1, mkSortedSet(0)),
+                mkEntry(topicId2, mkSortedSet(0))
+            );
+
+        receiveAssignment(newAssignment, membershipManager);
+
+        verifyReconciliationNotTriggered(membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(Collections.singleton(topicId2), 
membershipManager.topicsAwaitingReconciliation());
+        verify(metadata).requestUpdate(anyBoolean());
+        clearInvocations(membershipManager);
+        clearInvocations(commitRequestManager);

Review Comment:
   Done



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -552,8 +555,63 @@ public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU
 
         // Pending assignment that was discovered in metadata should be ready 
to reconcile in the
         // next reconciliation loop.
-        Set<TopicIdPartition> topic2Assignment = 
topicIdPartitionsSet(topicId2, topic2, 1, 2);
-        assertEquals(topic2Assignment, 
membershipManager.assignmentReadyToReconcile());
+        Map<Uuid, SortedSet<Integer>> topic2Assignment = 
topicIdPartitionsMap(topicId2,  1, 2);
+        assertEquals(topic2Assignment, 
membershipManager.topicPartitionsAwaitingReconciliation());
+    }
+
+    // Tests the case where topic metadata is not available at the time of the 
assignment,
+    // but is made available later.
+    @Test
+    public void testDelayedMetadataUsedToCompleteAssignment() {
+
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        final TopicIdPartition topicId1Partition0 = new 
TopicIdPartition(topicId1, new TopicPartition(topic1, 0));
+
+        Uuid topicId2 = Uuid.randomUuid();
+        String topic2 = "topic2";
+        final TopicIdPartition topicId2Partition0 = new 
TopicIdPartition(topicId2, new TopicPartition(topic2, 0));
+
+        // Receive assignment with only topic1-0, entering STABLE state.
+        MembershipManagerImpl membershipManager =
+            mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, topic1, 
Collections.singletonList(0));
+
+        membershipManager.onHeartbeatRequestSent();
+
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        
when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Collections.singleton(topicId1Partition0)));
+        clearInvocations(membershipManager, subscriptionState);
+
+        // New assignment adding a new topic2-0 (not in metadata).
+        // No reconciliation triggered, because new topic in assignment is 
waiting for metadata.
+
+        Map<Uuid, SortedSet<Integer>> newAssignment =
+            mkMap(
+                mkEntry(topicId1, mkSortedSet(0)),
+                mkEntry(topicId2, mkSortedSet(0))
+            );
+
+        receiveAssignment(newAssignment, membershipManager);
+
+        verifyReconciliationNotTriggered(membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(Collections.singleton(topicId2), 
membershipManager.topicsAwaitingReconciliation());
+        verify(metadata).requestUpdate(anyBoolean());
+        clearInvocations(membershipManager);
+        clearInvocations(commitRequestManager);
+
+        // Metadata discovered for topic2. Should trigger reconciliation to 
complete the assignment,
+        // with membership manager entering ACKNOWLEDGING state.
+
+        Map<Uuid, String> fullTopicMetadata = mkMap(
+            mkEntry(topicId1, topic1),
+            mkEntry(topicId2, topic2)
+        );
+        when(metadata.topicNames()).thenReturn(fullTopicMetadata);
+
+        membershipManager.onUpdate(null);
+
+        verifyReconciliationTriggeredAndCompleted(membershipManager,  
Arrays.asList(topicId1Partition0, topicId2Partition0));

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to