philipnee commented on code in PR #14680: URL: https://github.com/apache/kafka/pull/14680#discussion_r1386995116
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java: ########## @@ -168,6 +200,45 @@ public void testCommitted_ExceptionThrown() { } } + @Test + public void testEnsureCallbackExecutedByApplicationThread() { + final String currentThread = Thread.currentThread().getName(); + ExecutorService backgroundExecutor = Executors.newSingleThreadExecutor(); + MockCommitCallback callback = new MockCommitCallback(); + CountDownLatch latch = new CountDownLatch(1); // Initialize the latch with a count of 1 + try { + CompletableFuture<Void> future = new CompletableFuture<>(); + doReturn(future).when(consumer).commit(new HashMap<>(), false); + assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); + // Simulating some background work + backgroundExecutor.submit(() -> { + future.complete(null); + latch.countDown(); + }); + latch.await(); + assertEquals(1, consumer.callbacks()); + consumer.maybeInvokeCallbacks(); + assertEquals(currentThread, callback.completionThread); + } catch (Exception e) { + fail("Not expecting an exception"); + } finally { + backgroundExecutor.shutdown(); + } + } + + private static class MockCommitCallback implements OffsetCommitCallback { + public int invoked = 0; + public Exception exception = null; + public String completionThread; + + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + System.out.println("invoke"); Review Comment: Java debugging 101: use println -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org