lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1526715547


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -565,6 +565,58 @@ public void testSameAssignmentReconciledAgainWhenFenced() {
         assertEquals(toTopicIdPartitionMap(assignment1), 
membershipManager.currentAssignment().partitions);
     }
 
+    /**
+     * This is the case where we receive a new assignment while reconciling an 
existing one. The intermediate assignment
+     * is not applied, and a new assignment containing the same partitions is 
received and reconciled. In all assignments,
+     * one topic is not resolvable.
+     *
+     * We need to make sure that the last assignment is acked and applied, 
even though the set of partitions does not change.
+     * In this case, no rebalance listeners are run.
+     */
+    @Test
+    public void testSameAssignmentReconciledAgainWithMissingTopic() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        Uuid topic1 = Uuid.randomUuid();
+        Uuid topic2 = Uuid.randomUuid();
+        final Assignment assignment1 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+            .setTopicPartitions(Arrays.asList(
+                new 
TopicPartitions().setTopicId(topic1).setPartitions(Collections.singletonList(0)),
+                new 
TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+            ));
+        final Assignment assignment2 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+            .setTopicPartitions(Arrays.asList(
+                new 
TopicPartitions().setTopicId(topic1).setPartitions(Arrays.asList(0, 1)),
+                new 
TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+            ));
+        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1, 
"topic1"));
+
+        // Receive assignment - full reconciliation triggered
+        // stay in RECONCILING state, since an unresolved topic is assigned
+        
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1).data());
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        membershipManager.poll(time.milliseconds());
+        verifyReconciliationTriggeredAndCompleted(membershipManager,
+            Collections.singletonList(new TopicIdPartition(topic1, new 
TopicPartition("topic1", 0)))
+        );
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        clearInvocations(membershipManager);
+
+        // Receive extended assignment - assignment received but no 
reconciliation triggered
+        
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2).data());
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        verifyReconciliationNotTriggered(membershipManager);

Review Comment:
   Just to double check, it is intentional here that you're not calling poll 
right? (that's why a reconciliation is not triggered here, otherwise I would 
expect we do trigger a reconciliation for t1-1)...I guess this is what you 
refer to as "intermediate" assignment (an assignment received and removed 
before a call to poll)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to