kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1568001935
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1388,6 +1393,31 @@ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration } } + private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { Review Comment: nit: consider changing `disableWakeup` to `enableWakeup`. Double-negatives add nonzero cognitive overhead. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1388,6 +1393,31 @@ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration } } + private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { + if (lastPendingAsyncCommit == null) { + return; + } + + try { + final CompletableFuture<Void> futureToAwait = new CompletableFuture<>(); + // We don't want the wake-up trigger to complete our pending async commit future, + // so create new future here. Any errors in the pending async commit will be handled + // by the async commit future / the commit callback - here, we just want to wait for it to complete. + lastPendingAsyncCommit.whenComplete((v, t) -> futureToAwait.complete(null)); + if (!disableWakeup) { + wakeupTrigger.setActiveTask(futureToAwait); + } + ConsumerUtils.getResult(futureToAwait, timer); Review Comment: Is it true that the underlying `lastPendingAsyncCommit` `Future` could already be completed by this point, right? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } + @Test + public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { + time = new MockTime(1); + consumer = newConsumer(); + + // Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + final TopicPartition tp = new TopicPartition("foo", 0); + consumer.assign(Collections.singleton(tp)); + consumer.seek(tp, 20); + consumer.commitAsync(); Review Comment: Can this be replaced with a call to `testSyncCommitTimesoutAfterIncompleteAsyncCommit()` like the other tests? I glanced back and forth a couple of times and didn't see too much difference: ```suggestion final TopicPartition tp = new TopicPartition("foo", 0); testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); ``` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } + @Test + public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { + time = new MockTime(1); + consumer = newConsumer(); + + // Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + final TopicPartition tp = new TopicPartition("foo", 0); + consumer.assign(Collections.singleton(tp)); + consumer.seek(tp, 20); + consumer.commitAsync(); + + // Commit async is not completed yet, so commit sync should wait for it to complete (time out) + assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + + // Complete async commit event + final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); Review Comment: This use of JUnit is just about over my head... For my own understanding, at which line in this test does the `AsyncCommitEvent` get created and enqueued? I would assume at line 634, right? It looks like you're able to add the `ArgumentCaptor` _after_ the object pointed at by the argument was created. Is that correct? 🤔 ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -1005,6 +1083,43 @@ public void testNoWakeupInCloseCommit() { assertFalse(capturedEvent.get().future().isCompletedExceptionally()); } + @Test + public void testCloseAwaitPendingAsyncCommitIncomplete() { + time = new MockTime(1); + consumer = newConsumer(); + + // Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + final TopicPartition tp = new TopicPartition("foo", 0); + consumer.assign(Collections.singleton(tp)); + consumer.seek(tp, 20); + + consumer.commitAsync(); + Exception e = assertThrows(KafkaException.class, () -> consumer.close(Duration.ofMillis(10))); + assertInstanceOf(TimeoutException.class, e.getCause()); Review Comment: So this is where it may make sense to have special case to close and clear out the `consumer` instance, per the previous comment. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } + @Test + public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { + time = new MockTime(1); + consumer = newConsumer(); + + // Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + final TopicPartition tp = new TopicPartition("foo", 0); + consumer.assign(Collections.singleton(tp)); + consumer.seek(tp, 20); + consumer.commitAsync(); + + // Commit async is not completed yet, so commit sync should wait for it to complete (time out) + assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + + // Complete async commit event + final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().complete(null); + + // Commit async is completed, so commit sync completes immediately (since offsets are empty) + assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + } + + @Test + public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() { + final TopicPartition tp = new TopicPartition("foo", 0); + testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); + + // Complete async commit event and sync commit event + final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().complete(null); + + // Commit async is completed, so commit sync completes immediately (since offsets are empty) Review Comment: This comment legit confused me for a solid minute because it incorrectly states the offsets are empty when they're not 😆 ```suggestion // Commit async is completed, so commit sync does not need to wait before committing its offsets ``` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } + @Test + public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { + time = new MockTime(1); + consumer = newConsumer(); + + // Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + final TopicPartition tp = new TopicPartition("foo", 0); + consumer.assign(Collections.singleton(tp)); + consumer.seek(tp, 20); + consumer.commitAsync(); + + // Commit async is not completed yet, so commit sync should wait for it to complete (time out) + assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + + // Complete async commit event + final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().complete(null); + + // Commit async is completed, so commit sync completes immediately (since offsets are empty) + assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + } + + @Test + public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() { + final TopicPartition tp = new TopicPartition("foo", 0); + testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); + + // Complete async commit event and sync commit event + final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().complete(null); + + // Commit async is completed, so commit sync completes immediately (since offsets are empty) + assertDoesNotThrow(() -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); + } + + @Test + public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() { + final TopicPartition tp = new TopicPartition("foo", 0); + testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); + + // Complete exceptionally async commit event and sync commit event + final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().completeExceptionally(new KafkaException("Test exception")); + + // Commit async is completed exceptionally, but this will be handled by commit callback - commit sync should not fail. + assertDoesNotThrow(() -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); + } + + private void testSyncCommitTimesoutAfterIncompleteAsyncCommit(TopicPartition tp) { + time = new MockTime(1); + consumer = newConsumer(); + + // Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + consumer.assign(Collections.singleton(tp)); + consumer.seek(tp, 20); Review Comment: Is the `seek()` here to force the partition out of the `INITIALIZED` state in `SubscriptionState`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -158,7 +158,11 @@ public class AsyncKafkaConsumerTest { public void resetAll() { backgroundEventQueue.clear(); if (consumer != null) { - consumer.close(Duration.ZERO); + try { + consumer.close(Duration.ZERO); + } catch (Exception e) { + assertInstanceOf(KafkaException.class, e); + } Review Comment: Is this `try`/`catch` needed because the test leaves the `consumer` in a bad state? If so, the approach I took was to check for that state in the test itself. I faced a similar issue with tests that (intentionally) fenced consumers, so I closed the consumer at the end of the test method like so: ```java // Close the consumer here as we know it will cause a FencedInstanceIdException to be thrown. // If we get an error *other* than the FencedInstanceIdException, we'll fail the test. try { consumer.close(); fail("Fenced consumer should have thrown a {} on close", FencedInstanceIdException.class.getSimpleName()); } catch (KafkaException e) { assertNotNull(e.getCause()); assertInstanceOf(FencedInstanceIdException.class, e.getCause()); } finally { consumer = null; } ``` I considered adding a similar catch-all approach in `resetAll()`, but was concerned that it might unnecessarily mask issues. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } + @Test + public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { + time = new MockTime(1); + consumer = newConsumer(); + + // Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + final TopicPartition tp = new TopicPartition("foo", 0); + consumer.assign(Collections.singleton(tp)); + consumer.seek(tp, 20); + consumer.commitAsync(); + + // Commit async is not completed yet, so commit sync should wait for it to complete (time out) + assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); Review Comment: Very good test case: committing empty offsets _still_ waits. 👍 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -793,9 +795,9 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo } private CompletableFuture<Void> commit(final CommitEvent commitEvent) { - maybeThrowFencedInstanceException(); - maybeInvokeCommitCallbacks(); maybeThrowInvalidGroupIdException(); + maybeThrowFencedInstanceException(); + offsetCommitCallbackInvoker.executeCallbacks(); Review Comment: I had thought to ask before, is this reordering needed for correctness? I assume the order of checking for the _fenced_ case and _invalid group ID_ cases don't matter. But I assume that we wanted to make sure to perform those checks before executing the callbacks? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } + @Test + public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { + time = new MockTime(1); + consumer = newConsumer(); + + // Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + final TopicPartition tp = new TopicPartition("foo", 0); + consumer.assign(Collections.singleton(tp)); + consumer.seek(tp, 20); + consumer.commitAsync(); + + // Commit async is not completed yet, so commit sync should wait for it to complete (time out) + assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + + // Complete async commit event + final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().complete(null); + + // Commit async is completed, so commit sync completes immediately (since offsets are empty) + assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + } + + @Test + public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() { + final TopicPartition tp = new TopicPartition("foo", 0); + testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); + + // Complete async commit event and sync commit event + final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().complete(null); + + // Commit async is completed, so commit sync completes immediately (since offsets are empty) + assertDoesNotThrow(() -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); + } + + @Test + public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() { + final TopicPartition tp = new TopicPartition("foo", 0); + testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); + + // Complete exceptionally async commit event and sync commit event + final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().completeExceptionally(new KafkaException("Test exception")); + + // Commit async is completed exceptionally, but this will be handled by commit callback - commit sync should not fail. Review Comment: Cool! I was wondering about this case, but (of course) you've got it covered 😄 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() { } } - private void maybeInvokeCommitCallbacks() { - offsetCommitCallbackInvoker.executeCallbacks(); - } - Review Comment: I do not insist 😄 I was just curious and I agree that it doesn't odd anything in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org