lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1568770827


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1388,6 +1393,31 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
         }
     }
 
+    private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+        if (lastPendingAsyncCommit == null) {
+            return;
+        }
+
+        try {
+            final CompletableFuture<Void> futureToAwait = new 
CompletableFuture<>();
+            // We don't want the wake-up trigger to complete our pending async 
commit future,
+            // so create new future here. Any errors in the pending async 
commit will be handled
+            // by the async commit future / the commit callback - here, we 
just want to wait for it to complete.
+            lastPendingAsyncCommit.whenComplete((v, t) -> 
futureToAwait.complete(null));
+            if (!disableWakeup) {
+                wakeupTrigger.setActiveTask(futureToAwait);
+            }
+            ConsumerUtils.getResult(futureToAwait, timer);

Review Comment:
   Yes. I think if `lastPendingAsyncCommit` is completed before entering here, 
the `whenComplete` will execute immediately.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -158,7 +158,11 @@ public class AsyncKafkaConsumerTest {
     public void resetAll() {
         backgroundEventQueue.clear();
         if (consumer != null) {
-            consumer.close(Duration.ZERO);
+            try {
+                consumer.close(Duration.ZERO);
+            } catch (Exception e) {
+                assertInstanceOf(KafkaException.class, e);
+            }

Review Comment:
   `resetAll` isn't supposed to test anything, so this also shouldn't mask 
anything. It's purely for cleanup. In this case, it only affects two tests that 
will timeout on close (since we don't mock an async commit response). So let me 
do it. But in general, I wonder if adding clean-up logic to the tests itself 
won't reduce readability/clarity of the actual test case.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
         assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
     }
 
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+        consumer.commitAsync();
+
+        // Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+        assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+        // Complete async commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());

Review Comment:
   Yes. I agree, it's a bit weird, but mockito is recording those invocations 
with copies (references) of all arguments. That's also why spying on lots of 
objects in busy event loops will accumulate lots of such "invocation objects" 
in memory.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
         assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
     }
 
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+        consumer.commitAsync();

Review Comment:
   I guess I may be less concerned with code duplication in test setup than my 
reviewers :). Done. Added another helper method that contains the lines up to 
the incomplete async commit.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
         assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
     }
 
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+        consumer.commitAsync();
+
+        // Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+        assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+        // Complete async commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().complete(null);
+
+        // Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+        assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), 
Duration.ofMillis(100)));
+    }
+
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() 
{
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+        // Complete async commit event and sync commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().complete(null);
+
+        // Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+        assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+    }
+
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+        // Complete exceptionally async commit event and sync commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().completeExceptionally(new KafkaException("Test 
exception"));
+
+        // Commit async is completed exceptionally, but this will be handled 
by commit callback - commit sync should not fail.
+        assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+    }
+
+    private void 
testSyncCommitTimesoutAfterIncompleteAsyncCommit(TopicPartition tp) {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);

Review Comment:
   It's supposed to give the consumer a valid position, otherwise the commit 
async doesn't do anything. So yeah, I think that would mean forcing out of 
`INITIALIZING`? Or what did you mean by `INITIALIZED`..



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1388,6 +1393,31 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
         }
     }
 
+    private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {

Review Comment:
   Done



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1005,6 +1083,43 @@ public void testNoWakeupInCloseCommit() {
         assertFalse(capturedEvent.get().future().isCompletedExceptionally());
     }
 
+    @Test
+    public void testCloseAwaitPendingAsyncCommitIncomplete() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+
+        consumer.commitAsync();
+        Exception e = assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ofMillis(10)));
+        assertInstanceOf(TimeoutException.class, e.getCause());

Review Comment:
   True. In this case, we are actually testing `close`. But there are other 
test cases, which time-out similarly, but that's only an artifact and not the 
goal of the test.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -793,9 +795,9 @@ public void commitAsync(Map<TopicPartition, 
OffsetAndMetadata> offsets, OffsetCo
     }
 
     private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
-        maybeThrowFencedInstanceException();
-        maybeInvokeCommitCallbacks();
         maybeThrowInvalidGroupIdException();
+        maybeThrowFencedInstanceException();
+        offsetCommitCallbackInvoker.executeCallbacks();

Review Comment:
   Not needed for correctness. I just noticed that the order in the legacy 
consumer is this way. Not sure if there is  a strong reason to have this or the 
other order, so I decided to be consistent with the old behavior.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
         assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
     }
 
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+        consumer.commitAsync();
+
+        // Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+        assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+        // Complete async commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().complete(null);
+
+        // Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+        assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), 
Duration.ofMillis(100)));
+    }
+
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() 
{
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+        // Complete async commit event and sync commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().complete(null);
+
+        // Commit async is completed, so commit sync completes immediately 
(since offsets are empty)

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to