kirktrue commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1776180315
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1341,6 +1342,79 @@ public void testCloseAwaitPendingAsyncCommitComplete() {
assertEquals(1, cb.invoked);
}
+ @Test
+ public void testCloseWithInterruptUsingDefaultTimeout() {
+ SubscriptionState subscriptions = mock(SubscriptionState.class);
+ consumer = spy(newConsumer(
+ mock(FetchBuffer.class),
+ mock(ConsumerInterceptors.class),
+ mock(ConsumerRebalanceListenerInvoker.class),
+ subscriptions,
+ "group-id",
+ "client-id"));
+
+ // This future is completed when the ConsumerRebalanceListener has
been invoked and the
+ // ConsumerRebalanceListenerCallbackCompletedEvent has been enqueued.
+ CompletableFuture<Void> crlCallbackCompletedFuture = new
CompletableFuture<>();
+
+ doAnswer(invocation -> {
+ // When an UnsubscribeEvent is enqueued, don't complete it
immediately. Instead, enqueue the 'rebalance
+ // callback needed' event from the background thread.
+ SortedSet<TopicPartition> partitions = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+ partitions.addAll(subscriptions.assignedPartitions());
+ backgroundEventQueue.add(new
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED,
partitions));
+
+ // Complete the unsubscribe event when the
ConsumerRebalanceListenerCallbackCompletedEvent has been
+ // enqueued.
+ UnsubscribeEvent event = invocation.getArgument(0);
+ crlCallbackCompletedFuture.whenComplete((result, exception) -> {
+ if (exception != null)
+ event.future().completeExceptionally(exception);
+ else
+ event.future().complete(result);
+ });
+
+ return null;
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
+
+ doAnswer(invocation -> {
+ // This triggers the completion of the UnsubscribeEvent above.
+ crlCallbackCompletedFuture.complete(null);
+ return null;
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(ConsumerRebalanceListenerCallbackCompletedEvent.class));
+
+ try {
+ Thread.currentThread().interrupt();
+ assertThrows(InterruptException.class, () -> consumer.close());
+ } finally {
+ Thread.interrupted();
+ }
+
+ verifyUnsubscribeEvent(subscriptions);
+
verify(applicationEventHandler).add(any(ConsumerRebalanceListenerCallbackCompletedEvent.class));
+ }
+
+ @Test
+ public void testCloseWithInterruptUsingZeroTimeout() {
+ SubscriptionState subscriptions = mock(SubscriptionState.class);
+ consumer = spy(newConsumer(
+ mock(FetchBuffer.class),
+ mock(ConsumerInterceptors.class),
+ mock(ConsumerRebalanceListenerInvoker.class),
+ subscriptions,
+ "group-id",
+ "client-id"));
+
+ try {
+ Thread.currentThread().interrupt();
+ assertThrows(InterruptException.class, () ->
consumer.close(Duration.ZERO));
+ } finally {
+ Thread.interrupted();
+ }
+
+ verifyUnsubscribeEvent(subscriptions);
+ verify(applicationEventHandler,
never()).add(any(ConsumerRebalanceListenerCallbackCompletedEvent.class));
Review Comment:
This is a weird test in that it's testing that callbacks aren't invoked with
`close()` 🥴 I guess this should just be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]