kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1568001935


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1388,6 +1393,31 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
         }
     }
 
+    private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {

Review Comment:
   nit: consider changing `disableWakeup` to `enableWakeup`. Double-negatives 
add nonzero cognitive overhead.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1388,6 +1393,31 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
         }
     }
 
+    private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+        if (lastPendingAsyncCommit == null) {
+            return;
+        }
+
+        try {
+            final CompletableFuture<Void> futureToAwait = new 
CompletableFuture<>();
+            // We don't want the wake-up trigger to complete our pending async 
commit future,
+            // so create new future here. Any errors in the pending async 
commit will be handled
+            // by the async commit future / the commit callback - here, we 
just want to wait for it to complete.
+            lastPendingAsyncCommit.whenComplete((v, t) -> 
futureToAwait.complete(null));
+            if (!disableWakeup) {
+                wakeupTrigger.setActiveTask(futureToAwait);
+            }
+            ConsumerUtils.getResult(futureToAwait, timer);

Review Comment:
   Is it true that the underlying `lastPendingAsyncCommit` `Future` could 
already be completed by this point, right?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
         assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
     }
 
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+        consumer.commitAsync();

Review Comment:
   Can this be replaced with a call to 
`testSyncCommitTimesoutAfterIncompleteAsyncCommit()` like the other tests? I 
glanced back and forth a couple of times and didn't see too much difference:
   
   ```suggestion
           final TopicPartition tp = new TopicPartition("foo", 0);
           testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
         assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
     }
 
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+        consumer.commitAsync();
+
+        // Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+        assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+        // Complete async commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());

Review Comment:
   This use of JUnit is just about over my head...
   
   For my own understanding, at which line in this test does the 
`AsyncCommitEvent` get created and enqueued? I would assume at line 634, right?
   
   It looks like you're able to add the `ArgumentCaptor` _after_ the object 
pointed at by the argument was created. Is that correct? 🤔 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1005,6 +1083,43 @@ public void testNoWakeupInCloseCommit() {
         assertFalse(capturedEvent.get().future().isCompletedExceptionally());
     }
 
+    @Test
+    public void testCloseAwaitPendingAsyncCommitIncomplete() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+
+        consumer.commitAsync();
+        Exception e = assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ofMillis(10)));
+        assertInstanceOf(TimeoutException.class, e.getCause());

Review Comment:
   So this is where it may make sense to have special case to close and clear 
out the `consumer` instance, per the previous comment.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
         assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
     }
 
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+        consumer.commitAsync();
+
+        // Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+        assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+        // Complete async commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().complete(null);
+
+        // Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+        assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), 
Duration.ofMillis(100)));
+    }
+
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() 
{
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+        // Complete async commit event and sync commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().complete(null);
+
+        // Commit async is completed, so commit sync completes immediately 
(since offsets are empty)

Review Comment:
   This comment legit confused me for a solid minute because it incorrectly 
states the offsets are empty when they're not 😆
   
   ```suggestion
           // Commit async is completed, so commit sync does not need to wait 
before committing its offsets
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
         assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
     }
 
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+        consumer.commitAsync();
+
+        // Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+        assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+        // Complete async commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().complete(null);
+
+        // Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+        assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), 
Duration.ofMillis(100)));
+    }
+
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() 
{
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+        // Complete async commit event and sync commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().complete(null);
+
+        // Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+        assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+    }
+
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+        // Complete exceptionally async commit event and sync commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().completeExceptionally(new KafkaException("Test 
exception"));
+
+        // Commit async is completed exceptionally, but this will be handled 
by commit callback - commit sync should not fail.
+        assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+    }
+
+    private void 
testSyncCommitTimesoutAfterIncompleteAsyncCommit(TopicPartition tp) {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);

Review Comment:
   Is the `seek()` here to force the partition out of the `INITIALIZED` state 
in `SubscriptionState`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -158,7 +158,11 @@ public class AsyncKafkaConsumerTest {
     public void resetAll() {
         backgroundEventQueue.clear();
         if (consumer != null) {
-            consumer.close(Duration.ZERO);
+            try {
+                consumer.close(Duration.ZERO);
+            } catch (Exception e) {
+                assertInstanceOf(KafkaException.class, e);
+            }

Review Comment:
   Is this `try`/`catch` needed because the test leaves the `consumer` in a bad 
state?
   
   If so, the approach I took was to check for that state in the test itself. I 
faced a similar issue with tests that (intentionally) fenced consumers, so I 
closed the consumer at the end of the test method like so:
   
   ```java
   // Close the consumer here as we know it will cause a 
FencedInstanceIdException to be thrown.
   // If we get an error *other* than the FencedInstanceIdException, we'll fail 
the test.
   try {
       consumer.close();
       fail("Fenced consumer should have thrown a {} on close", 
FencedInstanceIdException.class.getSimpleName());
   } catch (KafkaException e) {
       assertNotNull(e.getCause());
       assertInstanceOf(FencedInstanceIdException.class, e.getCause());
   } finally {
       consumer = null;
   }
   ```
   
   I considered adding a similar catch-all approach in `resetAll()`, but was 
concerned that it might unnecessarily mask issues.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
         assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
     }
 
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+        consumer.commitAsync();
+
+        // Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+        assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));

Review Comment:
   Very good test case: committing empty offsets _still_ waits. 👍 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -793,9 +795,9 @@ public void commitAsync(Map<TopicPartition, 
OffsetAndMetadata> offsets, OffsetCo
     }
 
     private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
-        maybeThrowFencedInstanceException();
-        maybeInvokeCommitCallbacks();
         maybeThrowInvalidGroupIdException();
+        maybeThrowFencedInstanceException();
+        offsetCommitCallbackInvoker.executeCallbacks();

Review Comment:
   I had thought to ask before, is this reordering needed for correctness? I 
assume the order of checking for the _fenced_ case and _invalid group ID_ cases 
don't matter. But I assume that we wanted to make sure to perform those checks 
before executing the callbacks?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
         assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
     }
 
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+        time = new MockTime(1);
+        consumer = newConsumer();
+
+        // Commit async (incomplete)
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        consumer.assign(Collections.singleton(tp));
+        consumer.seek(tp, 20);
+        consumer.commitAsync();
+
+        // Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+        assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+        // Complete async commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().complete(null);
+
+        // Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+        assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), 
Duration.ofMillis(100)));
+    }
+
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() 
{
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+        // Complete async commit event and sync commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().complete(null);
+
+        // Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+        assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+    }
+
+    @Test
+    public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
+        final TopicPartition tp = new TopicPartition("foo", 0);
+        testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+        // Complete exceptionally async commit event and sync commit event
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        commitEvent.future().completeExceptionally(new KafkaException("Test 
exception"));
+
+        // Commit async is completed exceptionally, but this will be handled 
by commit callback - commit sync should not fail.

Review Comment:
   Cool! I was wondering about this case, but (of course) you've got it covered 
😄



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() {
         }
     }
 
-    private void maybeInvokeCommitCallbacks() {
-        offsetCommitCallbackInvoker.executeCallbacks();
-    }
-

Review Comment:
   I do not insist 😄 I was just curious and I agree that it doesn't odd 
anything in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to