kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1424283196


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -790,6 +791,297 @@ public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
         verify(membershipManager, never()).transitionToJoining();
     }
 
+    @Test
+    public void testListenerCallbacksBasic() {
+        // Step 1: set up mocks
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener();
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+
+        String topicName = "topic1";
+        Uuid topicId = Uuid.randomUuid();
+
+        
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+        doNothing().when(subscriptionState).markPendingRevocation(anySet());
+        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+
+        // Step 2: put the state machine into the appropriate... state
+        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+        receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(Collections.emptySet(), 
membershipManager.currentAssignment());
+        assertTrue(membershipManager.reconciliationInProgress());
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(0, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 3: assign partitions
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+                topicPartitions(topicName, 0, 1)
+        );
+
+        assertFalse(membershipManager.reconciliationInProgress());
+
+        // Step 4: Send ack and make sure we're done and our listener was 
called appropriately
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        assertEquals(topicIdPartitions(topicId, topicName, 0, 1), 
membershipManager.currentAssignment());
+
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(1, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+
+        // Step 5: receive an empty assignment, which means we should call 
revoke
+        
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
 0, 1));
+        receiveEmptyAssignment(membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 6: revoke partitions
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+                topicPartitions(topicName, 0, 1)
+        );
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 7: assign partitions should still be called, even though it's 
empty
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+                Collections.emptySortedSet()
+        );
+        assertFalse(membershipManager.reconciliationInProgress());
+
+        // Step 8: Send ack and make sure we're done and our listener was 
called appropriately
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        assertFalse(membershipManager.reconciliationInProgress());
+
+        assertEquals(1, listener.revokedCounter.get());
+        assertEquals(2, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+    }
+
+    // TODO: Reconciliation needs to support when a listener throws an error 
on onPartitionsRevoked(). When that
+    //       happens, the assignment step is skipped, which means 
onPartitionsAssigned() is never run.
+    //       The jury is out on whether or not this is a bug or intentional.
+    //
+    //       See 
https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more 
details.
+    // @Test
+    public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
+        // Step 1: set up mocks
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        mockOwnedPartition("topic1", 0);
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener(
+                Optional.of(new IllegalArgumentException("Intentional 
onPartitionsRevoked() error")),
+                Optional.empty(),
+                Optional.empty()
+        );
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+        doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+        // Step 2: put the state machine into the appropriate... state
+        receiveEmptyAssignment(membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(Collections.emptySet(), 
membershipManager.currentAssignment());
+        assertTrue(membershipManager.reconciliationInProgress());
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(0, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 3: revoke partitions
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+                topicPartitions("topic1", 0)
+        );
+
+        assertFalse(membershipManager.reconciliationInProgress());
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        // Step 4: Send ack and make sure we're done and our listener was 
called appropriately
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        assertEquals(1, listener.revokedCounter.get());
+        assertEquals(1, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+    }
+
+    @Test
+    public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
+        // Step 1: set up mocks
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        mockOwnedPartition("topic1", 0);
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener(
+                Optional.empty(),
+                Optional.of(new IllegalArgumentException("Intentional 
onPartitionsAssigned() error")),
+                Optional.empty()
+        );
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+        doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+        // Step 2: put the state machine into the appropriate... state
+        receiveEmptyAssignment(membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(Collections.emptySet(), 
membershipManager.currentAssignment());
+        assertTrue(membershipManager.reconciliationInProgress());
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(0, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 3: revoke partitions
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+                topicPartitions("topic1", 0)
+        );
+
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 4: assign partitions
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+                Collections.emptySortedSet()
+        );
+
+        assertFalse(membershipManager.reconciliationInProgress());
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        // Step 5: Send ack and make sure we're done and our listener was 
called appropriately
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        assertEquals(1, listener.revokedCounter.get());
+        assertEquals(1, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+    }
+
+    @Test
+    public void testOnPartitionsLostNoError() {
+        mockOwnedPartition("topic1", 0);
+        testOnPartitionsLost(Optional.empty());
+    }
+
+    @Test
+    public void testOnPartitionsLostError() {
+        mockOwnedPartition("topic1", 0);
+        testOnPartitionsLost(Optional.of(new KafkaException("Intentional error 
for test")));
+    }
+
+    private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
+        // Step 1: set up mocks
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener(
+                Optional.empty(),
+                Optional.empty(),
+                lostError
+        );
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+        doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+        // Step 2: put the state machine into the appropriate... state
+        membershipManager.transitionToFenced();
+        assertEquals(MemberState.FENCED, membershipManager.state());
+        assertEquals(Collections.emptySet(), 
membershipManager.currentAssignment());
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(0, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+
+        // Step 3: invoke the callback
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+                topicPartitions("topic1", 0)
+        );
+
+        // Step 4: Receive ack and make sure we're done and our listener was 
called appropriately
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.JOINING, membershipManager.state());
+
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(0, listener.assignedCounter.get());
+        assertEquals(1, listener.lostCounter.get());
+    }
+
+    private ConsumerRebalanceListenerInvoker 
consumerRebalanceListenerInvoker() {
+        ConsumerCoordinatorMetrics coordinatorMetrics = new 
ConsumerCoordinatorMetrics(
+                subscriptionState,
+                new Metrics(),
+                "test-");
+        return new ConsumerRebalanceListenerInvoker(
+                new LogContext(),
+                subscriptionState,
+                new MockTime(1),
+                coordinatorMetrics
+        );
+    }
+
+    private SortedSet<TopicPartition> topicPartitions(String topicName, int... 
partitions) {
+        SortedSet<TopicPartition> topicPartitions = new TreeSet<>(new 
Utils.TopicPartitionComparator());
+
+        for (int partition : partitions)
+            topicPartitions.add(new TopicPartition(topicName, partition));
+
+        return topicPartitions;
+    }
+
+    private SortedSet<TopicIdPartition> topicIdPartitions(Uuid topicId, String 
topicName, int... partitions) {
+        SortedSet<TopicIdPartition> topicIdPartitions = new TreeSet<>(new 
Utils.TopicIdPartitionComparator());
+
+        for (int partition : partitions)
+            topicIdPartitions.add(new TopicIdPartition(topicId, new 
TopicPartition(topicName, partition)));
+
+        return topicIdPartitions;
+    }
+
+    private void performCallback(MembershipManagerImpl membershipManager,
+                                 ConsumerRebalanceListenerInvoker invoker,
+                                 ConsumerRebalanceListenerMethodName 
expectedMethodName,
+                                 SortedSet<TopicPartition> expectedPartitions) 
{
+        // We expect only our enqueued event in the background queue.
+        assertEquals(1, backgroundEventQueue.size());
+        assertNotNull(backgroundEventQueue.peek());
+        assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class, 
backgroundEventQueue.peek());
+        ConsumerRebalanceListenerCallbackNeededEvent neededEvent = 
(ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll();
+        assertNotNull(neededEvent);
+        assertEquals(expectedMethodName, neededEvent.methodName());
+        assertEquals(expectedPartitions, neededEvent.partitions());
+
+        ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
+                invoker,
+                neededEvent.methodName(),
+                neededEvent.partitions(),
+                neededEvent.future()
+        );

Review Comment:
   Agreed. The boundaries and scope of the unit and integration tests isn't 
consistent. As the codebase has grown, we haven't been as careful as we should 
to maintain ease of unit testing 😞 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -790,6 +791,297 @@ public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
         verify(membershipManager, never()).transitionToJoining();
     }
 
+    @Test
+    public void testListenerCallbacksBasic() {
+        // Step 1: set up mocks
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener();
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+
+        String topicName = "topic1";
+        Uuid topicId = Uuid.randomUuid();
+
+        
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+        doNothing().when(subscriptionState).markPendingRevocation(anySet());
+        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+
+        // Step 2: put the state machine into the appropriate... state
+        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+        receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(Collections.emptySet(), 
membershipManager.currentAssignment());
+        assertTrue(membershipManager.reconciliationInProgress());
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(0, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 3: assign partitions
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+                topicPartitions(topicName, 0, 1)
+        );
+
+        assertFalse(membershipManager.reconciliationInProgress());
+
+        // Step 4: Send ack and make sure we're done and our listener was 
called appropriately
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        assertEquals(topicIdPartitions(topicId, topicName, 0, 1), 
membershipManager.currentAssignment());
+
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(1, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+
+        // Step 5: receive an empty assignment, which means we should call 
revoke
+        
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
 0, 1));
+        receiveEmptyAssignment(membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 6: revoke partitions
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+                topicPartitions(topicName, 0, 1)
+        );
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 7: assign partitions should still be called, even though it's 
empty
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+                Collections.emptySortedSet()
+        );
+        assertFalse(membershipManager.reconciliationInProgress());
+
+        // Step 8: Send ack and make sure we're done and our listener was 
called appropriately
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        assertFalse(membershipManager.reconciliationInProgress());
+
+        assertEquals(1, listener.revokedCounter.get());
+        assertEquals(2, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+    }
+
+    // TODO: Reconciliation needs to support when a listener throws an error 
on onPartitionsRevoked(). When that
+    //       happens, the assignment step is skipped, which means 
onPartitionsAssigned() is never run.
+    //       The jury is out on whether or not this is a bug or intentional.
+    //
+    //       See 
https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more 
details.
+    // @Test
+    public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
+        // Step 1: set up mocks
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        mockOwnedPartition("topic1", 0);
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener(
+                Optional.of(new IllegalArgumentException("Intentional 
onPartitionsRevoked() error")),
+                Optional.empty(),
+                Optional.empty()
+        );
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+        doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+        // Step 2: put the state machine into the appropriate... state
+        receiveEmptyAssignment(membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(Collections.emptySet(), 
membershipManager.currentAssignment());
+        assertTrue(membershipManager.reconciliationInProgress());
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(0, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 3: revoke partitions
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+                topicPartitions("topic1", 0)
+        );
+
+        assertFalse(membershipManager.reconciliationInProgress());
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        // Step 4: Send ack and make sure we're done and our listener was 
called appropriately
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        assertEquals(1, listener.revokedCounter.get());
+        assertEquals(1, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+    }
+
+    @Test
+    public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
+        // Step 1: set up mocks
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        mockOwnedPartition("topic1", 0);
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener(
+                Optional.empty(),
+                Optional.of(new IllegalArgumentException("Intentional 
onPartitionsAssigned() error")),
+                Optional.empty()
+        );
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+        doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+        // Step 2: put the state machine into the appropriate... state
+        receiveEmptyAssignment(membershipManager);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(Collections.emptySet(), 
membershipManager.currentAssignment());
+        assertTrue(membershipManager.reconciliationInProgress());
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(0, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 3: revoke partitions
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+                topicPartitions("topic1", 0)
+        );
+
+        assertTrue(membershipManager.reconciliationInProgress());
+
+        // Step 4: assign partitions
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+                Collections.emptySortedSet()
+        );
+
+        assertFalse(membershipManager.reconciliationInProgress());
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        // Step 5: Send ack and make sure we're done and our listener was 
called appropriately
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        assertEquals(1, listener.revokedCounter.get());
+        assertEquals(1, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+    }
+
+    @Test
+    public void testOnPartitionsLostNoError() {
+        mockOwnedPartition("topic1", 0);
+        testOnPartitionsLost(Optional.empty());
+    }
+
+    @Test
+    public void testOnPartitionsLostError() {
+        mockOwnedPartition("topic1", 0);
+        testOnPartitionsLost(Optional.of(new KafkaException("Intentional error 
for test")));
+    }
+
+    private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
+        // Step 1: set up mocks
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener(
+                Optional.empty(),
+                Optional.empty(),
+                lostError
+        );
+        ConsumerRebalanceListenerInvoker invoker = 
consumerRebalanceListenerInvoker();
+
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+        doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+        // Step 2: put the state machine into the appropriate... state
+        membershipManager.transitionToFenced();
+        assertEquals(MemberState.FENCED, membershipManager.state());
+        assertEquals(Collections.emptySet(), 
membershipManager.currentAssignment());
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(0, listener.assignedCounter.get());
+        assertEquals(0, listener.lostCounter.get());
+
+        // Step 3: invoke the callback
+        performCallback(
+                membershipManager,
+                invoker,
+                ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+                topicPartitions("topic1", 0)
+        );
+
+        // Step 4: Receive ack and make sure we're done and our listener was 
called appropriately
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.JOINING, membershipManager.state());
+
+        assertEquals(0, listener.revokedCounter.get());
+        assertEquals(0, listener.assignedCounter.get());
+        assertEquals(1, listener.lostCounter.get());
+    }
+
+    private ConsumerRebalanceListenerInvoker 
consumerRebalanceListenerInvoker() {
+        ConsumerCoordinatorMetrics coordinatorMetrics = new 
ConsumerCoordinatorMetrics(
+                subscriptionState,
+                new Metrics(),
+                "test-");
+        return new ConsumerRebalanceListenerInvoker(
+                new LogContext(),
+                subscriptionState,
+                new MockTime(1),
+                coordinatorMetrics
+        );
+    }
+
+    private SortedSet<TopicPartition> topicPartitions(String topicName, int... 
partitions) {
+        SortedSet<TopicPartition> topicPartitions = new TreeSet<>(new 
Utils.TopicPartitionComparator());
+
+        for (int partition : partitions)
+            topicPartitions.add(new TopicPartition(topicName, partition));
+
+        return topicPartitions;
+    }
+
+    private SortedSet<TopicIdPartition> topicIdPartitions(Uuid topicId, String 
topicName, int... partitions) {
+        SortedSet<TopicIdPartition> topicIdPartitions = new TreeSet<>(new 
Utils.TopicIdPartitionComparator());
+
+        for (int partition : partitions)
+            topicIdPartitions.add(new TopicIdPartition(topicId, new 
TopicPartition(topicName, partition)));
+
+        return topicIdPartitions;
+    }
+
+    private void performCallback(MembershipManagerImpl membershipManager,
+                                 ConsumerRebalanceListenerInvoker invoker,
+                                 ConsumerRebalanceListenerMethodName 
expectedMethodName,
+                                 SortedSet<TopicPartition> expectedPartitions) 
{
+        // We expect only our enqueued event in the background queue.
+        assertEquals(1, backgroundEventQueue.size());
+        assertNotNull(backgroundEventQueue.peek());
+        assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class, 
backgroundEventQueue.peek());
+        ConsumerRebalanceListenerCallbackNeededEvent neededEvent = 
(ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll();
+        assertNotNull(neededEvent);
+        assertEquals(expectedMethodName, neededEvent.methodName());
+        assertEquals(expectedPartitions, neededEvent.partitions());
+
+        ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
+                invoker,
+                neededEvent.methodName(),
+                neededEvent.partitions(),
+                neededEvent.future()
+        );

Review Comment:
   Agreed. The boundaries and scope of the unit and integration tests isn't 
consistent. As the codebase has grown, we haven't been as careful as we should 
to maintain ease of unit testing 😞 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to