Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru merged PR #15613: URL: https://github.com/apache/kafka/pull/15613 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1572360676 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first Review Comment: Okay got it. Yeah, the state "async commit pending because I do not know the coordinator yet" and the state "async commit is already sent to the coordinator" may cover two different cases. In fact, in the legacy consumer, these are two independent states the async commit can be in. In the async consumer as well, although waiting for the async commit to complete works independently of that state (since the different states are handled by the background thread, and we simplify wait for the future in the foreground thread). ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +634,85 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +final TopicPartition tp = new TopicPartition("foo", 0); +testIncompleteAsyncCommit(tp); + +final CompletableFuture asyncCommitFuture = getLastEnqueuedEventFuture(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete exceptionally async commit event +asyncCommitFuture.completeExceptionally(new KafkaException("Test exception")); + +// Commit async is completed, so commit sync completes immediately (since offsets are empty) +assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); +} + +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() { +final TopicPartition tp = new TopicPartition("foo", 0); +testIncompleteAsyncCommit(tp); + +final CompletableFuture asyncCommitFuture = getLastEnqueuedEventFuture(); + +// Mock to complete sync event +completeCommitSyncApplicationEventSuccessfully(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
cadonna commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2066478507 > > @lucasbru Thanks for the PR! > > The unit tests you added fail in the build and also for me locally. > > Plus, I have a question regarding the integration tests. > > @cadonna Thanks for the review, I hadn't noticed the test failures. Seems the `ArgumentCaptor`s type-matching introduced with Mockito 5 does not work on Java 8. I committed a workaround. Also, I'm both shocked and impressed that you are using Java 8 locally. It has to run at least on Java 8, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
cadonna commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1572290394 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first Review Comment: Fair enough, but why is that important? Is the intention that the async commit needs to lookup the group coordinator? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1005,6 +1102,43 @@ public void testNoWakeupInCloseCommit() { assertFalse(capturedEvent.get().future().isCompletedExceptionally()); } +@Test +public void testCloseAwaitPendingAsyncCommitIncomplete() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Review Comment: Do we need this stub? ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). Review Comment: ```suggestion // disabled, or simply because there are no consumed offsets). ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +634,85 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +final TopicPartition tp = new TopicPartition("foo", 0); +testIncompleteAsyncCommit(tp); + +final CompletableFuture asyncCommitFuture =
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2066392315 > @lucasbru Thanks for the PR! > > The unit tests you added fail in the build and also for me locally. > > Plus, I have a question regarding the integration tests. @cadonna Thanks for the review, I hadn't noticed the test failures. Seems the `ArgumentCaptor`s type-matching introduced with Mockito 5 does not work on Java 8. I committed a workaround. Also, I'm both shocked and impressed that you are using Java 8 locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1572109503 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount) + +// Try with coordinator known Review Comment: We ar guaranteed to have looked up the coordinator here, because we did request data from it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1572109078 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first Review Comment: We are not guaranteed to have looked up the coordinator here, because we did not request any data from it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
cadonna commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1570932239 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first Review Comment: I do not understand those comments. How do I recognize in the code that the coordinator is not looked up first? And why is that so important? ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1569228945 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete async commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); Review Comment: Interesting. Good to know. Fuzzy temporal logic 🤦♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1569232314 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete async commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); +final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); +commitEvent.future().complete(null); + +// Commit async is completed, so commit sync completes immediately (since offsets are empty) +assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); +} + +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() { +final TopicPartition tp = new TopicPartition("foo", 0); +testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); + +// Complete async commit event and sync commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); +final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); +commitEvent.future().complete(null); + +// Commit async is completed, so commit sync completes immediately (since offsets are empty) +assertDoesNotThrow(() -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); +} + +@Test +public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() { +final TopicPartition tp = new TopicPartition("foo", 0); +testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); + +// Complete exceptionally async commit event and sync commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); +final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); +commitEvent.future().completeExceptionally(new KafkaException("Test exception")); + +// Commit async is completed exceptionally, but this will be handled by commit callback - commit sync should not fail. +assertDoesNotThrow(() -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); +} + +private void testSyncCommitTimesoutAfterIncompleteAsyncCommit(TopicPartition tp) { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); Review Comment: Yes, that's what I meant 😉 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1569231774 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); Review Comment: 😆 When I see duplicate (or nearly duplicated) code, my brain turns up its sensitivity because I assume there's some devil-in-the-details reason that the code wasn't reused. I'd assumed that it could be ever-so-slightly refactored into using `testSyncCommitTimesoutAfterIncompleteAsyncCommit()`, but no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1569226961 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -793,9 +795,9 @@ public void commitAsync(Map offsets, OffsetCo } private CompletableFuture commit(final CommitEvent commitEvent) { -maybeThrowFencedInstanceException(); -maybeInvokeCommitCallbacks(); maybeThrowInvalidGroupIdException(); +maybeThrowFencedInstanceException(); +offsetCommitCallbackInvoker.executeCallbacks(); Review Comment: Works for me. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1568770827 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration } } +private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { +if (lastPendingAsyncCommit == null) { +return; +} + +try { +final CompletableFuture futureToAwait = new CompletableFuture<>(); +// We don't want the wake-up trigger to complete our pending async commit future, +// so create new future here. Any errors in the pending async commit will be handled +// by the async commit future / the commit callback - here, we just want to wait for it to complete. +lastPendingAsyncCommit.whenComplete((v, t) -> futureToAwait.complete(null)); +if (!disableWakeup) { +wakeupTrigger.setActiveTask(futureToAwait); +} +ConsumerUtils.getResult(futureToAwait, timer); Review Comment: Yes. I think if `lastPendingAsyncCommit` is completed before entering here, the `whenComplete` will execute immediately. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -158,7 +158,11 @@ public class AsyncKafkaConsumerTest { public void resetAll() { backgroundEventQueue.clear(); if (consumer != null) { -consumer.close(Duration.ZERO); +try { +consumer.close(Duration.ZERO); +} catch (Exception e) { +assertInstanceOf(KafkaException.class, e); +} Review Comment: `resetAll` isn't supposed to test anything, so this also shouldn't mask anything. It's purely for cleanup. In this case, it only affects two tests that will timeout on close (since we don't mock an async commit response). So let me do it. But in general, I wonder if adding clean-up logic to the tests itself won't reduce readability/clarity of the actual test case. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete async commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); Review Comment: Yes. I agree, it's a bit weird, but mockito is recording those invocations with copies (references) of all arguments. That's also why spying on lots of objects in busy event loops will accumulate lots of such "invocation objects" in memory. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); Review Comment: I guess I may be less concerned with code duplication in test setup than my reviewers :). Done. Added another helper method that contains the lines up to the incomplete async commit. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerT
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1568001935 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration } } +private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { Review Comment: nit: consider changing `disableWakeup` to `enableWakeup`. Double-negatives add nonzero cognitive overhead. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration } } +private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { +if (lastPendingAsyncCommit == null) { +return; +} + +try { +final CompletableFuture futureToAwait = new CompletableFuture<>(); +// We don't want the wake-up trigger to complete our pending async commit future, +// so create new future here. Any errors in the pending async commit will be handled +// by the async commit future / the commit callback - here, we just want to wait for it to complete. +lastPendingAsyncCommit.whenComplete((v, t) -> futureToAwait.complete(null)); +if (!disableWakeup) { +wakeupTrigger.setActiveTask(futureToAwait); +} +ConsumerUtils.getResult(futureToAwait, timer); Review Comment: Is it true that the underlying `lastPendingAsyncCommit` `Future` could already be completed by this point, right? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); Review Comment: Can this be replaced with a call to `testSyncCommitTimesoutAfterIncompleteAsyncCommit()` like the other tests? I glanced back and forth a couple of times and didn't see too much difference: ```suggestion final TopicPartition tp = new TopicPartition("foo", 0); testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete async commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); Review Comment: This use of JUnit is just about over my head... For my own understanding, at which line in this test does the `AsyncCommitEvent` get created and enqueued? I would assume at line 634, right? It looks like you're able to add the `ArgumentCaptor` _after_ the object pointed at by the argument was created. Is that correct? 🤔 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1005,6 +1083,43 @@ public void testNoWakeupInCloseCommit() { assertFalse(capturedEvent.get().future().isCompletedExceptionally()); } +@Test +public void
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2056077615 @lianetm thanks for the comments, addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2056076747 @cadonna Could you please review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1561902955 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1388,6 +1393,33 @@ public void commitSync(Map offsets, Duration } } +private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { +if (lastPendingAsyncCommit == null) { +return; +} + +try { +final CompletableFuture futureToAwait = new CompletableFuture<>(); +// We don't want the wake-up trigger to complete our pending async commit future, +// so create new future here. Any errors in the pending async commit will be handled +// by the async commit future / the commit callback - here, we just want to wait for it to complete. +lastPendingAsyncCommit.whenComplete((v, t) -> { Review Comment: nit: we could loose the braces and inline this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1561901442 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,90 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete async commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); +final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); +commitEvent.future().complete(null); + +// Commit async is completed, so commit sync completes immediately (since offsets are empty) +assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); +} + +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Mock to complete sync event +completeCommitSyncApplicationEventSuccessfully(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); + +// Complete async commit event and sync commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); +final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); +commitEvent.future().complete(null); + +// Commit async is completed, so commit sync completes immediately (since offsets are empty) +assertDoesNotThrow(() -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); +} + +@Test +public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Mock to complete sync event +completeCommitSyncApplicationEventSuccessfully(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); Review Comment: This section seems to be doing exactly the same as in the initial section of the previous test? maybe clearer to encapsulate in a common `testSyncCommitTimesoutAfterIncompleteAsyncCommit`, and then each tests proceeds completing the commits its way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1561212534 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: You're right! (silly me, I was probably thinking about a duration arg instead of a timer), all good then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1560885552 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() { } } -private void maybeInvokeCommitCallbacks() { -offsetCommitCallbackInvoker.executeCallbacks(); -} - Review Comment: For me, abstracting this one-liner is more obfuscating than it is helping, but if you insist, I can bring it back. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1388,6 +1393,37 @@ public void commitSync(Map offsets, Duration } } +private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { +if (lastPendingAsyncCommit == null) { +return; +} + +try { +CompletableFuture futureToAwait; +if (!disableWakeup) { +// We don't want the wake-up trigger to complete our pending async commit future, +// so create new future here. +futureToAwait = new CompletableFuture<>(); +lastPendingAsyncCommit.whenComplete((v, t) -> { +if (t != null) { +futureToAwait.completeExceptionally(t); +} else { +futureToAwait.complete(v); +} +}); +wakeupTrigger.setActiveTask(futureToAwait); +} else { +futureToAwait = lastPendingAsyncCommit; +} +ConsumerUtils.getResult(futureToAwait, timer); +lastPendingAsyncCommit = null; +} finally { +if (!disableWakeup) wakeupTrigger.clearTask(); +timer.update(); +} Review Comment: I think always clearing it in `finally` would mean that `lastPendingAsyncCommit` is cleared even though we timed out or were woken up while waiting for it. However, this brought up another issue - what happens when the async commit future completes exceptionally? We'd throw the exception here, but we shouldn't - the error will be handled inside the future. So basically here we want to wait for the async commit, not worrying about return value or exception. And then, the only cases why we fail here should be wake-up or time out, and in both cases, we should check again for the future to be completed the next time we trigger commit sync. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest { @AfterEach public void resetAll() { backgroundEventQueue.clear(); -if (consumer != null) { +try { consumer.close(Duration.ZERO); +} catch (Exception e) { +// ignore Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1559907295 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest { @AfterEach public void resetAll() { backgroundEventQueue.clear(); -if (consumer != null) { +try { consumer.close(Duration.ZERO); +} catch (Exception e) { +// ignore Review Comment: I'm a little leery about swallowing the exception here. Can we validate the exception type is something we expect? e.g.: ```suggestion } catch (Exception e) { assertInstanceOf(KafkaException.class, e); ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1388,6 +1393,37 @@ public void commitSync(Map offsets, Duration } } +private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { +if (lastPendingAsyncCommit == null) { +return; +} + +try { +CompletableFuture futureToAwait; +if (!disableWakeup) { +// We don't want the wake-up trigger to complete our pending async commit future, +// so create new future here. +futureToAwait = new CompletableFuture<>(); +lastPendingAsyncCommit.whenComplete((v, t) -> { +if (t != null) { +futureToAwait.completeExceptionally(t); +} else { +futureToAwait.complete(v); +} +}); +wakeupTrigger.setActiveTask(futureToAwait); +} else { +futureToAwait = lastPendingAsyncCommit; +} +ConsumerUtils.getResult(futureToAwait, timer); +lastPendingAsyncCommit = null; +} finally { +if (!disableWakeup) wakeupTrigger.clearTask(); +timer.update(); +} Review Comment: Do we want to clear out the `lastPendingAsyncCommit` in the `finally` block: ```suggestion ConsumerUtils.getResult(futureToAwait, timer); } finally { lastPendingAsyncCommit = null; if (!disableWakeup) wakeupTrigger.clearTask(); timer.update(); } ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() { } } -private void maybeInvokeCommitCallbacks() { -offsetCommitCallbackInvoker.executeCallbacks(); -} - Review Comment: Any reason we don't want to keep this method abstraction? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2047945261 Hey @lianetm. I split the PR into two, the changes for the legacy consumer go into https://github.com/apache/kafka/pull/15693. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org