lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1526715547
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -565,6 +565,58 @@ public void testSameAssignmentReconciledAgainWhenFenced() { assertEquals(toTopicIdPartitionMap(assignment1), membershipManager.currentAssignment().partitions); } + /** + * This is the case where we receive a new assignment while reconciling an existing one. The intermediate assignment + * is not applied, and a new assignment containing the same partitions is received and reconciled. In all assignments, + * one topic is not resolvable. + * + * We need to make sure that the last assignment is acked and applied, even though the set of partitions does not change. + * In this case, no rebalance listeners are run. + */ + @Test + public void testSameAssignmentReconciledAgainWithMissingTopic() { + MembershipManagerImpl membershipManager = createMemberInStableState(); + Uuid topic1 = Uuid.randomUuid(); + Uuid topic2 = Uuid.randomUuid(); + final Assignment assignment1 = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Arrays.asList( + new TopicPartitions().setTopicId(topic1).setPartitions(Collections.singletonList(0)), + new TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0)) + )); + final Assignment assignment2 = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Arrays.asList( + new TopicPartitions().setTopicId(topic1).setPartitions(Arrays.asList(0, 1)), + new TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0)) + )); + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1, "topic1")); + + // Receive assignment - full reconciliation triggered + // stay in RECONCILING state, since an unresolved topic is assigned + membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1).data()); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + membershipManager.poll(time.milliseconds()); + verifyReconciliationTriggeredAndCompleted(membershipManager, + Collections.singletonList(new TopicIdPartition(topic1, new TopicPartition("topic1", 0))) + ); + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + clearInvocations(membershipManager); + + // Receive extended assignment - assignment received but no reconciliation triggered + membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2).data()); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + verifyReconciliationNotTriggered(membershipManager); Review Comment: Just to double check, it is intentional here that you're not calling poll right? (that's why a reconciliation is not triggered here, otherwise I would expect we do trigger a reconciliation for t1-1)...I guess this is what you refer to as "intermediate" assignment (an assignment received and removed before a call to poll) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org