kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1424283196
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -790,6 +791,297 @@ public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { verify(membershipManager, never()).transitionToJoining(); } + @Test + public void testListenerCallbacksBasic() { + // Step 1: set up mocks + MembershipManagerImpl membershipManager = createMemberInStableState(); + CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener(); + ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); + + String topicName = "topic1"; + Uuid topicId = Uuid.randomUuid(); + + when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet()); + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener)); + doNothing().when(subscriptionState).markPendingRevocation(anySet()); + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); + + // Step 2: put the state machine into the appropriate... state + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); + receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(Collections.emptySet(), membershipManager.currentAssignment()); + assertTrue(membershipManager.reconciliationInProgress()); + assertEquals(0, listener.revokedCounter.get()); + assertEquals(0, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 3: assign partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, + topicPartitions(topicName, 0, 1) + ); + + assertFalse(membershipManager.reconciliationInProgress()); + + // Step 4: Send ack and make sure we're done and our listener was called appropriately + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.STABLE, membershipManager.state()); + assertEquals(topicIdPartitions(topicId, topicName, 0, 1), membershipManager.currentAssignment()); + + assertEquals(0, listener.revokedCounter.get()); + assertEquals(1, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + + // Step 5: receive an empty assignment, which means we should call revoke + when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName, 0, 1)); + receiveEmptyAssignment(membershipManager); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 6: revoke partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, + topicPartitions(topicName, 0, 1) + ); + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 7: assign partitions should still be called, even though it's empty + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, + Collections.emptySortedSet() + ); + assertFalse(membershipManager.reconciliationInProgress()); + + // Step 8: Send ack and make sure we're done and our listener was called appropriately + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.STABLE, membershipManager.state()); + assertFalse(membershipManager.reconciliationInProgress()); + + assertEquals(1, listener.revokedCounter.get()); + assertEquals(2, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + } + + // TODO: Reconciliation needs to support when a listener throws an error on onPartitionsRevoked(). When that + // happens, the assignment step is skipped, which means onPartitionsAssigned() is never run. + // The jury is out on whether or not this is a bug or intentional. + // + // See https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more details. + // @Test + public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() { + // Step 1: set up mocks + MembershipManagerImpl membershipManager = createMemberInStableState(); + mockOwnedPartition("topic1", 0); + CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener( + Optional.of(new IllegalArgumentException("Intentional onPartitionsRevoked() error")), + Optional.empty(), + Optional.empty() + ); + ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); + + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener)); + doNothing().when(subscriptionState).markPendingRevocation(anySet()); + + // Step 2: put the state machine into the appropriate... state + receiveEmptyAssignment(membershipManager); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(Collections.emptySet(), membershipManager.currentAssignment()); + assertTrue(membershipManager.reconciliationInProgress()); + assertEquals(0, listener.revokedCounter.get()); + assertEquals(0, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 3: revoke partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, + topicPartitions("topic1", 0) + ); + + assertFalse(membershipManager.reconciliationInProgress()); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + // Step 4: Send ack and make sure we're done and our listener was called appropriately + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + assertEquals(1, listener.revokedCounter.get()); + assertEquals(1, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + } + + @Test + public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() { + // Step 1: set up mocks + MembershipManagerImpl membershipManager = createMemberInStableState(); + mockOwnedPartition("topic1", 0); + CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener( + Optional.empty(), + Optional.of(new IllegalArgumentException("Intentional onPartitionsAssigned() error")), + Optional.empty() + ); + ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); + + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener)); + doNothing().when(subscriptionState).markPendingRevocation(anySet()); + + // Step 2: put the state machine into the appropriate... state + receiveEmptyAssignment(membershipManager); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(Collections.emptySet(), membershipManager.currentAssignment()); + assertTrue(membershipManager.reconciliationInProgress()); + assertEquals(0, listener.revokedCounter.get()); + assertEquals(0, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 3: revoke partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, + topicPartitions("topic1", 0) + ); + + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 4: assign partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, + Collections.emptySortedSet() + ); + + assertFalse(membershipManager.reconciliationInProgress()); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + // Step 5: Send ack and make sure we're done and our listener was called appropriately + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + assertEquals(1, listener.revokedCounter.get()); + assertEquals(1, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + } + + @Test + public void testOnPartitionsLostNoError() { + mockOwnedPartition("topic1", 0); + testOnPartitionsLost(Optional.empty()); + } + + @Test + public void testOnPartitionsLostError() { + mockOwnedPartition("topic1", 0); + testOnPartitionsLost(Optional.of(new KafkaException("Intentional error for test"))); + } + + private void testOnPartitionsLost(Optional<RuntimeException> lostError) { + // Step 1: set up mocks + MembershipManagerImpl membershipManager = createMemberInStableState(); + CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener( + Optional.empty(), + Optional.empty(), + lostError + ); + ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); + + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener)); + doNothing().when(subscriptionState).markPendingRevocation(anySet()); + + // Step 2: put the state machine into the appropriate... state + membershipManager.transitionToFenced(); + assertEquals(MemberState.FENCED, membershipManager.state()); + assertEquals(Collections.emptySet(), membershipManager.currentAssignment()); + assertEquals(0, listener.revokedCounter.get()); + assertEquals(0, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + + // Step 3: invoke the callback + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST, + topicPartitions("topic1", 0) + ); + + // Step 4: Receive ack and make sure we're done and our listener was called appropriately + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.JOINING, membershipManager.state()); + + assertEquals(0, listener.revokedCounter.get()); + assertEquals(0, listener.assignedCounter.get()); + assertEquals(1, listener.lostCounter.get()); + } + + private ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker() { + ConsumerCoordinatorMetrics coordinatorMetrics = new ConsumerCoordinatorMetrics( + subscriptionState, + new Metrics(), + "test-"); + return new ConsumerRebalanceListenerInvoker( + new LogContext(), + subscriptionState, + new MockTime(1), + coordinatorMetrics + ); + } + + private SortedSet<TopicPartition> topicPartitions(String topicName, int... partitions) { + SortedSet<TopicPartition> topicPartitions = new TreeSet<>(new Utils.TopicPartitionComparator()); + + for (int partition : partitions) + topicPartitions.add(new TopicPartition(topicName, partition)); + + return topicPartitions; + } + + private SortedSet<TopicIdPartition> topicIdPartitions(Uuid topicId, String topicName, int... partitions) { + SortedSet<TopicIdPartition> topicIdPartitions = new TreeSet<>(new Utils.TopicIdPartitionComparator()); + + for (int partition : partitions) + topicIdPartitions.add(new TopicIdPartition(topicId, new TopicPartition(topicName, partition))); + + return topicIdPartitions; + } + + private void performCallback(MembershipManagerImpl membershipManager, + ConsumerRebalanceListenerInvoker invoker, + ConsumerRebalanceListenerMethodName expectedMethodName, + SortedSet<TopicPartition> expectedPartitions) { + // We expect only our enqueued event in the background queue. + assertEquals(1, backgroundEventQueue.size()); + assertNotNull(backgroundEventQueue.peek()); + assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class, backgroundEventQueue.peek()); + ConsumerRebalanceListenerCallbackNeededEvent neededEvent = (ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll(); + assertNotNull(neededEvent); + assertEquals(expectedMethodName, neededEvent.methodName()); + assertEquals(expectedPartitions, neededEvent.partitions()); + + ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( + invoker, + neededEvent.methodName(), + neededEvent.partitions(), + neededEvent.future() + ); Review Comment: Agreed. The boundaries and scope of the unit and integration tests isn't consistent. As the codebase has grown, we haven't been as careful as we should to maintain ease of unit testing 😞 ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -790,6 +791,297 @@ public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { verify(membershipManager, never()).transitionToJoining(); } + @Test + public void testListenerCallbacksBasic() { + // Step 1: set up mocks + MembershipManagerImpl membershipManager = createMemberInStableState(); + CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener(); + ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); + + String topicName = "topic1"; + Uuid topicId = Uuid.randomUuid(); + + when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet()); + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener)); + doNothing().when(subscriptionState).markPendingRevocation(anySet()); + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); + + // Step 2: put the state machine into the appropriate... state + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); + receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(Collections.emptySet(), membershipManager.currentAssignment()); + assertTrue(membershipManager.reconciliationInProgress()); + assertEquals(0, listener.revokedCounter.get()); + assertEquals(0, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 3: assign partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, + topicPartitions(topicName, 0, 1) + ); + + assertFalse(membershipManager.reconciliationInProgress()); + + // Step 4: Send ack and make sure we're done and our listener was called appropriately + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.STABLE, membershipManager.state()); + assertEquals(topicIdPartitions(topicId, topicName, 0, 1), membershipManager.currentAssignment()); + + assertEquals(0, listener.revokedCounter.get()); + assertEquals(1, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + + // Step 5: receive an empty assignment, which means we should call revoke + when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName, 0, 1)); + receiveEmptyAssignment(membershipManager); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 6: revoke partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, + topicPartitions(topicName, 0, 1) + ); + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 7: assign partitions should still be called, even though it's empty + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, + Collections.emptySortedSet() + ); + assertFalse(membershipManager.reconciliationInProgress()); + + // Step 8: Send ack and make sure we're done and our listener was called appropriately + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.STABLE, membershipManager.state()); + assertFalse(membershipManager.reconciliationInProgress()); + + assertEquals(1, listener.revokedCounter.get()); + assertEquals(2, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + } + + // TODO: Reconciliation needs to support when a listener throws an error on onPartitionsRevoked(). When that + // happens, the assignment step is skipped, which means onPartitionsAssigned() is never run. + // The jury is out on whether or not this is a bug or intentional. + // + // See https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more details. + // @Test + public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() { + // Step 1: set up mocks + MembershipManagerImpl membershipManager = createMemberInStableState(); + mockOwnedPartition("topic1", 0); + CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener( + Optional.of(new IllegalArgumentException("Intentional onPartitionsRevoked() error")), + Optional.empty(), + Optional.empty() + ); + ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); + + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener)); + doNothing().when(subscriptionState).markPendingRevocation(anySet()); + + // Step 2: put the state machine into the appropriate... state + receiveEmptyAssignment(membershipManager); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(Collections.emptySet(), membershipManager.currentAssignment()); + assertTrue(membershipManager.reconciliationInProgress()); + assertEquals(0, listener.revokedCounter.get()); + assertEquals(0, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 3: revoke partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, + topicPartitions("topic1", 0) + ); + + assertFalse(membershipManager.reconciliationInProgress()); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + // Step 4: Send ack and make sure we're done and our listener was called appropriately + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + assertEquals(1, listener.revokedCounter.get()); + assertEquals(1, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + } + + @Test + public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() { + // Step 1: set up mocks + MembershipManagerImpl membershipManager = createMemberInStableState(); + mockOwnedPartition("topic1", 0); + CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener( + Optional.empty(), + Optional.of(new IllegalArgumentException("Intentional onPartitionsAssigned() error")), + Optional.empty() + ); + ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); + + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener)); + doNothing().when(subscriptionState).markPendingRevocation(anySet()); + + // Step 2: put the state machine into the appropriate... state + receiveEmptyAssignment(membershipManager); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(Collections.emptySet(), membershipManager.currentAssignment()); + assertTrue(membershipManager.reconciliationInProgress()); + assertEquals(0, listener.revokedCounter.get()); + assertEquals(0, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 3: revoke partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, + topicPartitions("topic1", 0) + ); + + assertTrue(membershipManager.reconciliationInProgress()); + + // Step 4: assign partitions + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, + Collections.emptySortedSet() + ); + + assertFalse(membershipManager.reconciliationInProgress()); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + // Step 5: Send ack and make sure we're done and our listener was called appropriately + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + assertEquals(1, listener.revokedCounter.get()); + assertEquals(1, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + } + + @Test + public void testOnPartitionsLostNoError() { + mockOwnedPartition("topic1", 0); + testOnPartitionsLost(Optional.empty()); + } + + @Test + public void testOnPartitionsLostError() { + mockOwnedPartition("topic1", 0); + testOnPartitionsLost(Optional.of(new KafkaException("Intentional error for test"))); + } + + private void testOnPartitionsLost(Optional<RuntimeException> lostError) { + // Step 1: set up mocks + MembershipManagerImpl membershipManager = createMemberInStableState(); + CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener( + Optional.empty(), + Optional.empty(), + lostError + ); + ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); + + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener)); + doNothing().when(subscriptionState).markPendingRevocation(anySet()); + + // Step 2: put the state machine into the appropriate... state + membershipManager.transitionToFenced(); + assertEquals(MemberState.FENCED, membershipManager.state()); + assertEquals(Collections.emptySet(), membershipManager.currentAssignment()); + assertEquals(0, listener.revokedCounter.get()); + assertEquals(0, listener.assignedCounter.get()); + assertEquals(0, listener.lostCounter.get()); + + // Step 3: invoke the callback + performCallback( + membershipManager, + invoker, + ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST, + topicPartitions("topic1", 0) + ); + + // Step 4: Receive ack and make sure we're done and our listener was called appropriately + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.JOINING, membershipManager.state()); + + assertEquals(0, listener.revokedCounter.get()); + assertEquals(0, listener.assignedCounter.get()); + assertEquals(1, listener.lostCounter.get()); + } + + private ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker() { + ConsumerCoordinatorMetrics coordinatorMetrics = new ConsumerCoordinatorMetrics( + subscriptionState, + new Metrics(), + "test-"); + return new ConsumerRebalanceListenerInvoker( + new LogContext(), + subscriptionState, + new MockTime(1), + coordinatorMetrics + ); + } + + private SortedSet<TopicPartition> topicPartitions(String topicName, int... partitions) { + SortedSet<TopicPartition> topicPartitions = new TreeSet<>(new Utils.TopicPartitionComparator()); + + for (int partition : partitions) + topicPartitions.add(new TopicPartition(topicName, partition)); + + return topicPartitions; + } + + private SortedSet<TopicIdPartition> topicIdPartitions(Uuid topicId, String topicName, int... partitions) { + SortedSet<TopicIdPartition> topicIdPartitions = new TreeSet<>(new Utils.TopicIdPartitionComparator()); + + for (int partition : partitions) + topicIdPartitions.add(new TopicIdPartition(topicId, new TopicPartition(topicName, partition))); + + return topicIdPartitions; + } + + private void performCallback(MembershipManagerImpl membershipManager, + ConsumerRebalanceListenerInvoker invoker, + ConsumerRebalanceListenerMethodName expectedMethodName, + SortedSet<TopicPartition> expectedPartitions) { + // We expect only our enqueued event in the background queue. + assertEquals(1, backgroundEventQueue.size()); + assertNotNull(backgroundEventQueue.peek()); + assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class, backgroundEventQueue.peek()); + ConsumerRebalanceListenerCallbackNeededEvent neededEvent = (ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll(); + assertNotNull(neededEvent); + assertEquals(expectedMethodName, neededEvent.methodName()); + assertEquals(expectedPartitions, neededEvent.partitions()); + + ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( + invoker, + neededEvent.methodName(), + neededEvent.partitions(), + neededEvent.future() + ); Review Comment: Agreed. The boundaries and scope of the unit and integration tests isn't consistent. As the codebase has grown, we haven't been as careful as we should to maintain ease of unit testing 😞 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org