Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru merged PR #14937: URL: https://github.com/apache/kafka/pull/14937 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1431330516 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -169,6 +184,35 @@ private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { ); } +private AsyncKafkaConsumer newConsumer( +FetchBuffer fetchBuffer, +ConsumerInterceptors interceptors, +ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, +SubscriptionState subscriptions, +List assignors, +String groupId, +String clientId) { +return new AsyncKafkaConsumer<>( Review Comment: @philipnee I removed this on purpose from the test file to use the normal constructor and not just test a mock construction of the class. I'm sure there could have been a better way than to reintroduce the constructor that I remove. However, since we are otherwise not converging with this PR, I am going to merge this. Please consider following up with a PR that removes this constructor again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1862111535 Hey @lucasbru - I think your patch fixed the long running/oom issue with the async consumer test. The tests finished within 3.5hr in this commit: [1786208](https://github.com/apache/kafka/pull/14937/commits/17862086b63864a8ad685f822e565da8626e8ea3) However, there are still quite a few flaky tests in the PlaintextConsumerText. Namely.. `testExpandingTopicSubscriptions` `testShrinkingTopicSubscriptions` `testFetchOutOfRangeOffsetResetConfigLatest` They are observed sparsely in other builds I've seen. So I'm disabling them in the latest commit: [11a3ae6](https://github.com/apache/kafka/pull/14937/commits/11a3ae633ac551efcf342aa9cb32bd57a1b44314) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1862105547 hey @lucasbru - Seems like your fix fixed the long running/oom issue with the test. The `testExpandingTopicSubscriptions` is on the flaky side, I've observed that in other PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1859230526 Thanks @lucasbru -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1859087131 @philipnee I think part of the reason why things are running particularly bad this weekend might be a change in transaction tests. I proposed a revert here. https://github.com/apache/kafka/pull/15029 I will retrigger the CI for this PR for now. Let's see if we get the revert-PR merged, then we can rebase this PR and hopefully get stable runs again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1859081002 @philipnee If tests are flaky in the PR that are also flaky on trunk, I don't think this is blocking the merge given the current state of the CI. The problem is that the builds fail completely, so there is no way to tell whether this PR introduces new problems or not. I think this may also be caused by other changes on trunk. For now, I think we have to restart CI and hope for the situation to improve, or debug the build failures ourselves. I looked into it briefly (checking the last few runs on trunk), and it seemed to me that a lot of failures of Gradle Executors followed the execution of `TransactionsWithTieredStoreTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1859051822 hey @lucasbru - any idea on how to get a clean(-er) build? I think this is my 37th attempt to restart the build. One cause is the trunk merged in some flaky tests which are addressed here: https://github.com/apache/kafka/pull/15025 , looking at the history, I think the trunk hasn't been very healthy for the past week. I wonder if we should try to merge that PR first then perhaps this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee closed pull request #14937: KAFKA-15696: Refactor closing consumer URL: https://github.com/apache/kafka/pull/14937 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee closed pull request #14937: KAFKA-15696: Refactor closing consumer URL: https://github.com/apache/kafka/pull/14937 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee closed pull request #14937: KAFKA-15696: Refactor closing consumer URL: https://github.com/apache/kafka/pull/14937 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1856643470 reopen for testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee closed pull request #14937: KAFKA-15696: Refactor closing consumer URL: https://github.com/apache/kafka/pull/14937 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1426388734 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -810,89 +854,87 @@ public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId final Properties props = requiredConsumerPropertiesAndGroupId(groupId); props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); final ConsumerConfig config = new ConsumerConfig(props); -try (final AsyncKafkaConsumer consumer = -new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - -final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - -assertEquals(groupId, groupMetadata.groupId()); -assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); -assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); -assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); -} +final AsyncKafkaConsumer consumer = +new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()); +final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); +assertEquals(groupId, groupMetadata.groupId()); +assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); +assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); +assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); +consumer.close(Duration.ZERO); } @Test public void testGroupMetadataUpdateSingleCall() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); -try (final AsyncKafkaConsumer consumer = Review Comment: I didn't mean to revert the changes, but discard them when we merge with the asyncKafkaConsumer refactoring. Is it possible these changes were required to not run OOM? One option would be to rebase this PR on the refactoring, which can possibly resolve these OOMs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1855386057 This looks quite suspect: ``` Caused by: java.lang.OutOfMemoryError: Java heap space at java.util.HashMap.newNode(HashMap.java:1774) at java.util.HashMap.putVal(HashMap.java:632) at java.util.HashMap.put(HashMap.java:613) at java.util.HashSet.add(HashSet.java:220) at java.util.AbstractCollection.addAll(AbstractCollection.java:344) at net.bytebuddy.dynamic.scaffold.MethodGraph$Compiler$Default$Key$Harmonized.detach(MethodGraph.java:1020) at net.bytebuddy.dynamic.scaffold.MethodGraph$Compiler$Default$Key$Store$Entry$Resolved.asNode(MethodGraph.java:1536) at net.bytebuddy.dynamic.scaffold.MethodGraph$Compiler$Default$Key$Store.asGraph(MethodGraph.java:1252) at net.bytebuddy.dynamic.scaffold.MethodGraph$Compiler$Default.compile(MethodGraph.java:646) at net.bytebuddy.dynamic.scaffold.MethodGraph$Compiler$AbstractBase.compile(MethodGraph.java:519) at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.isOverridden(MockMethodAdvice.java:189) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.commit(AsyncKafkaConsumer.java:641) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumerTest.testCommitAsync_UserSuppliedCallback(AsyncKafkaConsumerTest.java:217) ``` This is the only PR with lots of OOM errors right now, so I suspect there is something wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1855217598 I rebased to see if that improves the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1854478484 for `testFetcherConcurrency` I'm actually having a hard time to replicate the issue locally. So let's see what the test suggests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1854460882 Sorry about the exceedingly large log, but for documentation purposes: 269/269 tests passed (so i enabled both generic and consumer protocol). ``` Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testCoordinatorFailover(String, String) > testCoordinatorFailover(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testCoordinatorFailover(String, String) > testCoordinatorFailover(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testCoordinatorFailover(String, String) > testCoordinatorFailover(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testClusterResourceListener(String, String) > testClusterResourceListener(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testClusterResourceListener(String, String) > testClusterResourceListener(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testClusterResourceListener(String, String) > testClusterResourceListener(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testClusterResourceListener(String, String) > testClusterResourceListener(String, String).quorum=kraft+kip848.groupProtocol=consumer PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testSimpleConsumption(String, String) > testSimpleConsumption(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testSimpleConsumption(String, String) > testSimpleConsumption(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testSimpleConsumption(String, String) > testSimpleConsumption(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testSimpleConsumption(String, String) > testSimpleConsumption(String, String).quorum=kraft+kip848.groupProtocol=consumer PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > testPartitionsForAutoCreate(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > testPartitionsForAutoCreate(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > testPartitionsForAutoCreate(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > testShrinkingTopicSubscriptions(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > testShrinkingTopicSubscriptions(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > testShrinkingTopicSubscriptions(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > testMaxPollIntervalMs(String, String).quorum=zk.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > testMaxPollIntervalMs(String, String).quorum=kraft.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > testMaxPollIntervalMs(String, String).quorum=kraft+kip848.groupProtocol=generic PASSED Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > PlaintextConsumerTest > testAssignAndConsumeFromCommittedOffsets(String, String) > testAssignAndConsumeFromCommittedOffsets(String,
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee closed pull request #14937: KAFKA-15696: Refactor closing consumer URL: https://github.com/apache/kafka/pull/14937 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1854443779 Let me close and reopen the ticket to trigger the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1854172469 CI ran `testFetcherConcurrency` for > 1h on two pipelines and seems to OOM as well. Can you check if this PR is in any way related? https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-14937/runs/26/nodes/11/steps/87/log/?start=0 https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-14937/runs/26/nodes/9/steps/90/log/?start=0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1854157002 Sorry @lucasbru - forgot to post the integration results but I did. Let me do that for the record. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1853559479 Restarting build as previous was aborted. @philipnee have you run the integration tests on this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424589872 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -285,44 +286,6 @@ void testEnsureEventsAreCompleted() { assertTrue(applicationEventsQueue.isEmpty()); } -@Test Review Comment: removed because they are irrelevant now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424585777 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -810,89 +854,87 @@ public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId final Properties props = requiredConsumerPropertiesAndGroupId(groupId); props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); final ConsumerConfig config = new ConsumerConfig(props); -try (final AsyncKafkaConsumer consumer = -new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - -final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - -assertEquals(groupId, groupMetadata.groupId()); -assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); -assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); -assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); -} +final AsyncKafkaConsumer consumer = +new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()); +final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); +assertEquals(groupId, groupMetadata.groupId()); +assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); +assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); +assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); +consumer.close(Duration.ZERO); } @Test public void testGroupMetadataUpdateSingleCall() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); -try (final AsyncKafkaConsumer consumer = Review Comment: ok makes sense, I can revert these changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424433876 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean swallowException) { } } +/** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ +private void prepareShutdown(final Timer timer, final AtomicReference firstException) { +if (!groupMetadata.isPresent()) +return; +maybeAutoCommitSync(timer, firstException); +timer.update(); +waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, timer.remainingMs()), timer, firstException); +maybeInvokeCommitCallbacks(); +maybeRevokePartitions(timer, firstException); +waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, timer.remainingMs()), timer, firstException); +} + +private void waitOnEventCompletion(final ConsumerCloseApplicationEvent event, + final Timer timer, + final AtomicReference firstException) { +try { +applicationEventHandler.addAndGet(event, timer); +} catch (TimeoutException e) { Review Comment: @kirktrue and I discussed the potential tasks for dealing with zero timeout. This needs to be examined perhaps after the preview. So we will spin off a jira ticket for this specific issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424414983 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ final NetworkClientDelegate networkClientDelegate, final Timer timer) { // These are the optional outgoing requests at the -List pollResults = requestManagers.stream() +requestManagers.stream() Review Comment: This is not a conflict actually - this is just some changes to how fetch request manager closes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424413992 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1026,15 +1038,13 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - +// Prepare shutting down the network thread +prepareShutdown(closeTimer, firstException); +closeTimer.update(); if (applicationEventHandler != null) -closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); - -// Invoke all callbacks after the background thread exists in case if there are unsent async -// commits -maybeInvokeCommitCallbacks(); - -closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); +closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); +closeTimer.update(); +// Ensure all async commit callbacks are invoked Review Comment: might not even need this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424029239 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1026,15 +1038,13 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - +// Prepare shutting down the network thread +prepareShutdown(closeTimer, firstException); +closeTimer.update(); if (applicationEventHandler != null) -closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); - -// Invoke all callbacks after the background thread exists in case if there are unsent async -// commits -maybeInvokeCommitCallbacks(); - -closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); +closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); +closeTimer.update(); +// Ensure all async commit callbacks are invoked Review Comment: misplaced comment ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1050,6 +1060,85 @@ private void close(Duration timeout, boolean swallowException) { } } +/** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ +void prepareShutdown(final Timer timer, final AtomicReference firstException) { +if (!groupMetadata.isPresent()) +return; +maybeAutoCommitSync(autoCommitEnabled, timer, firstException); +timer.update(); +applicationEventHandler.add(new CommitOnCloseApplicationEvent()); +maybeRevokePartitions(timer, firstException); +waitOnCompletion( +() -> applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer), +"leave group on close", timer, firstException); +maybeInvokeCommitCallbacks(); +} + +// Visible for testing +void maybeRevokePartitions(final Timer timer, final AtomicReference firstException) { +if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) +return; +// TODO: We obviously needs to block until the partition revocation completes. +waitOnCompletion(this::invokePartitionRevocationListener, "revoke partitions", timer, firstException); +subscriptions.assignFromSubscribed(Collections.emptySet()); +} + +// Visible for testing +void maybeAutoCommitSync(final boolean shouldAutoCommit, + final Timer timer, + final AtomicReference firstException) { +if (!shouldAutoCommit) +return; +waitOnCompletion(() -> { +Map allConsumed = subscriptions.allConsumed(); +log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); +commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); +}, "autoCommitSync", timer, firstException); +} + +// Visible for testing +void waitOnCompletion(final Runnable function, + final String taskName, + final Timer timer, + final AtomicReference firstException) { +try { +function.run(); +} catch (Exception e) { +handleException(e, taskName, timer, Optional.of(firstException)); +} finally { +timer.update(); +} +} + +private void handleException(final Exception e, + final String taskName, + final Timer timer, + final Optional> firstException) { +if (e instanceof TimeoutException) { +log.debug("Timeout of {}ms expired before the {} operation could complete.", timer.remainingMs(), taskName); +} else { +Exception exception = e; +if (e instanceof ExecutionException) +exception = (Exception) e.getCause(); +if (!firstException.isPresent()) +log.debug("Failed to execute {} operation due to {}", taskName, exception.getMessage()); +firstException.get().compareAndSet(null, exception); +} +} + +private CompletableFuture invokePartitionRevocationListener() { +
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1423701094 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -810,89 +854,87 @@ public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId final Properties props = requiredConsumerPropertiesAndGroupId(groupId); props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); final ConsumerConfig config = new ConsumerConfig(props); -try (final AsyncKafkaConsumer consumer = -new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - -final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - -assertEquals(groupId, groupMetadata.groupId()); -assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); -assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); -assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); -} +final AsyncKafkaConsumer consumer = +new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()); +final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); +assertEquals(groupId, groupMetadata.groupId()); +assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); +assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); +assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); +consumer.close(Duration.ZERO); } @Test public void testGroupMetadataUpdateSingleCall() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); -try (final AsyncKafkaConsumer consumer = Review Comment: Please be aware that all these changes will be unnecessary/conflicting once we move to mocks, which is rather soon I believe, see: https://github.com/apache/kafka/pull/14930 We will get lots of conflicts and I think whoever has to resolve the conflicts will probably just skip the changes from this PR, because the commit logic in the background thread will not be executed anymore in this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1421824192 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -273,80 +250,28 @@ private void closeInternal(final Duration timeout) { } } +private void sendUnsentRequests(final Timer timer) { +// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until +// all requests have received a response. +while (!networkClientDelegate.unsentRequests().isEmpty() && timer.notExpired()) { Review Comment: Well, if all requests are sent, I wouldn't timeout. But otherwise, yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1420798994 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -273,80 +250,28 @@ private void closeInternal(final Duration timeout) { } } +private void sendUnsentRequests(final Timer timer) { +// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until +// all requests have received a response. +while (!networkClientDelegate.unsentRequests().isEmpty() && timer.notExpired()) { Review Comment: sorry, to reiterate on your comment, perhaps your suggestion is: if time has run out, we do client.poll(0) to try to send and process the request one last time. if the time hasn't run out and there are still request to be sent, we continue to poll until all requests are sent and timer runs out. Is this what you meant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1420786217 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ final NetworkClientDelegate networkClientDelegate, final Timer timer) { Review Comment: noted, let's get rid of runAtClose all together in the future PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1420557352 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ final NetworkClientDelegate networkClientDelegate, final Timer timer) { Review Comment: Seems timer is now completely unused. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -317,7 +326,7 @@ private void process(final GroupMetadataUpdateEvent event) { networkClientDelegateSupplier); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, -applicationEventQueue, +applicationEventQueue, Review Comment: indentation ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -273,80 +250,28 @@ private void closeInternal(final Duration timeout) { } } +private void sendUnsentRequests(final Timer timer) { +// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until +// all requests have received a response. +while (!networkClientDelegate.unsentRequests().isEmpty() && timer.notExpired()) { Review Comment: Closing with timeout 0 would mean we don't send any closing requests, right? I think we should `poll` nevertheless, so we should check the timer at the end. I think if we'd use the normal poll loop as long as `timeout > 0`, this function may not need to check the timer anyway, since it's only used if the time ran out and there are still unsent requests. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -469,7 +481,7 @@ private void process(final GroupMetadataUpdateEvent event) { Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, metadata, -applicationEventQueue, +applicationEventQueue, Review Comment: indentation ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ## @@ -82,6 +82,7 @@ public PollResult poll(long currentTimeMs) { */ @Override public PollResult pollOnClose() { +// TODO: move the logic to poll to handle signal close Review Comment: Yes, using the normal poll loop sounds like a good idea. We should still probably `sendUnsentRequests` once when the timeout has passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418128177 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1024,15 +1037,14 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - +// Prepare shutting down the network thread +prepareShutdown(closeTimer, firstException); Review Comment: Completing the tasks before shutting down the network thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1846153413 Hi @kirktrue @lucasbru - Sorry about the huge PR. But I've addressed most of your comments if not all (apologize if not). I've left comments on places where a bit more discussions are needed. LMK if you have any questions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1419688648 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -315,7 +315,8 @@ public void testFetcherCloseClosesFetchSessionsInBroker() { // the close() method with a Timer will NOT send out the close session requests on close. The network // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we need to run that logic here. ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, timer); - +// the network is polled during the last state of clean up. +networkClientDelegate.poll(time.timer(1)); Review Comment: cc @kirktrue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1419688146 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -810,89 +854,87 @@ public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId final Properties props = requiredConsumerPropertiesAndGroupId(groupId); props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); final ConsumerConfig config = new ConsumerConfig(props); -try (final AsyncKafkaConsumer consumer = -new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - -final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - -assertEquals(groupId, groupMetadata.groupId()); -assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); -assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); -assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); -} +final AsyncKafkaConsumer consumer = +new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()); +final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); +assertEquals(groupId, groupMetadata.groupId()); +assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); +assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); +assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); +consumer.close(Duration.ZERO); } @Test public void testGroupMetadataUpdateSingleCall() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); -try (final AsyncKafkaConsumer consumer = Review Comment: removing the try to close the consumer with 0 timeout. @lucasbru -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1419685035 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ## @@ -82,6 +82,7 @@ public PollResult poll(long currentTimeMs) { */ @Override public PollResult pollOnClose() { +// TODO: move the logic to poll to handle signal close Review Comment: I added a method signalClose() to the interface. I wonder if we should keep letting the network thread poll the network client as usual, until we actually invoke close. This means, close will do very little but check if there are any pending requests. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ## @@ -82,6 +82,7 @@ public PollResult poll(long currentTimeMs) { */ @Override public PollResult pollOnClose() { +// TODO: move the logic to poll to handle signal close Review Comment: @kirktrue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1419683491 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ final NetworkClientDelegate networkClientDelegate, final Timer timer) { // These are the optional outgoing requests at the -List pollResults = requestManagers.stream() +requestManagers.stream() Review Comment: see the comment in the FetchRequestManager ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ final NetworkClientDelegate networkClientDelegate, final Timer timer) { // These are the optional outgoing requests at the -List pollResults = requestManagers.stream() +requestManagers.stream() Review Comment: @kirktrue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1419683219 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1050,6 +1060,85 @@ private void close(Duration timeout, boolean swallowException) { } } +/** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ +void prepareShutdown(final Timer timer, final AtomicReference firstException) { +if (!groupMetadata.isPresent()) +return; +maybeAutoCommitSync(autoCommitEnabled, timer, firstException); +timer.update(); +applicationEventHandler.add(new CommitOnCloseApplicationEvent()); +maybeRevokePartitions(timer, firstException); +waitOnCompletion( +() -> applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer), +"leave group on close", timer, firstException); +maybeInvokeCommitCallbacks(); +} + +// Visible for testing +void maybeRevokePartitions(final Timer timer, final AtomicReference firstException) { +if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) +return; +// TODO: We obviously needs to block until the partition revocation completes. +waitOnCompletion(this::invokePartitionRevocationListener, "revoke partitions", timer, firstException); +subscriptions.assignFromSubscribed(Collections.emptySet()); +} + +// Visible for testing +void maybeAutoCommitSync(final boolean shouldAutoCommit, + final Timer timer, + final AtomicReference firstException) { +if (!shouldAutoCommit) +return; +waitOnCompletion(() -> { +Map allConsumed = subscriptions.allConsumed(); +log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); +commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); +}, "autoCommitSync", timer, firstException); +} + +// Visible for testing +void waitOnCompletion(final Runnable function, + final String taskName, + final Timer timer, + final AtomicReference firstException) { +try { +function.run(); +} catch (Exception e) { +handleException(e, taskName, timer, Optional.of(firstException)); +} finally { +timer.update(); +} +} + +private void handleException(final Exception e, + final String taskName, + final Timer timer, + final Optional> firstException) { +if (e instanceof TimeoutException) { +log.debug("Timeout of {}ms expired before the {} operation could complete.", timer.remainingMs(), taskName); +} else { +Exception exception = e; +if (e instanceof ExecutionException) +exception = (Exception) e.getCause(); +if (!firstException.isPresent()) +log.debug("Failed to execute {} operation due to {}", taskName, exception.getMessage()); +firstException.get().compareAndSet(null, exception); +} +} + +private CompletableFuture invokePartitionRevocationListener() { +SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); +if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) +return CompletableFuture.completedFuture(null); +// TODO: Invoke rebalanceListener via KAFKA-15276 Review Comment: @kirktrue - I am not 100% sure what is the right way to invoke the listener. Are we returning a completable future? The current implementation blocks on listener invocation, which means where we need to do future.get(forever). If the listener is broken in some way, then we are stuck here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1419350301 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) { } void cleanup() { +log.trace("Closing the consumer network thread"); +Timer timer = time.timer(closeTimeout); try { -log.trace("Closing the consumer network thread"); -Timer timer = time.timer(closeTimeout); -maybeAutocommitOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); -maybeLeaveGroup(timer); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { +networkClientDelegate.awaitPendingRequests(timer); Review Comment: I think we actually don't need this here because runAtClose is already closing after checking the code. if the timer runs out, then we don't need to poll again. if all request are completed before timer runs out, then we don't need to repoll again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1419350301 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) { } void cleanup() { +log.trace("Closing the consumer network thread"); +Timer timer = time.timer(closeTimeout); try { -log.trace("Closing the consumer network thread"); -Timer timer = time.timer(closeTimeout); -maybeAutocommitOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); -maybeLeaveGroup(timer); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { +networkClientDelegate.awaitPendingRequests(timer); Review Comment: I think we actually don't need this here because runAtClose is already closing after checking the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1419350301 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) { } void cleanup() { +log.trace("Closing the consumer network thread"); +Timer timer = time.timer(closeTimeout); try { -log.trace("Closing the consumer network thread"); -Timer timer = time.timer(closeTimeout); -maybeAutocommitOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); -maybeLeaveGroup(timer); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { +networkClientDelegate.awaitPendingRequests(timer); Review Comment: it seems like there's network poll in both runAtClose and here. We really only need to poll it in a single place. I wonder if we could just remove the polls in runAtClose() and try to send out the requests here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1419342099 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -194,11 +189,11 @@ static void runAtClose(final Collection> requ // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until // all requests have received a response. -do { +while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)) { Review Comment: if the close timer has expired, should we proceed with closing without sending the request? I'm undecided on this. @kirktrue wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
lucasbru commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418972399 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean swallowException) { } } +/** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ +private void prepareShutdown(final Timer timer, final AtomicReference firstException) { +if (!groupMetadata.isPresent()) +return; +maybeAutoCommitSync(timer, firstException); +timer.update(); +waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, timer.remainingMs()), timer, firstException); +maybeInvokeCommitCallbacks(); +maybeRevokePartitions(timer, firstException); +waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, timer.remainingMs()), timer, firstException); +} + +private void waitOnEventCompletion(final ConsumerCloseApplicationEvent event, + final Timer timer, + final AtomicReference firstException) { +try { +applicationEventHandler.addAndGet(event, timer); +} catch (TimeoutException e) { +log.debug("Timeout of {}ms expired before the {} operation could complete.", +timer.remainingMs(), +event.task()); +} catch (Exception e) { +firstException.compareAndSet(null, e); +} finally { +timer.update(); +} +} + +private void maybeRevokePartitions(final Timer timer, final AtomicReference firstException) { +if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) +return; +try { +// If the consumer is in a group, we will pause and revoke all assigned partitions +onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS); +} catch (Exception e) { +Exception exception = e; +if (e instanceof ExecutionException) +exception = (Exception) e.getCause(); +firstException.compareAndSet(null, exception); +} finally { +subscriptions.assignFromSubscribed(Collections.emptySet()); +timer.update(); +} +} + +private void maybeAutoCommitSync(final Timer timer, final AtomicReference firstException) { +if (autoCommitEnabled) { +Map allConsumed = subscriptions.allConsumed(); +try { +log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); +commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); +} catch (TimeoutException e) { +log.debug("Timeout of {}ms expired before the auto commit could complete.", +timer.remainingMs()); +} catch (Exception e) { +// consistent with async auto-commit failures, we do not propagate the exception +log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); +firstException.compareAndSet(null, e); +} +} +} + +private CompletableFuture onLeavePrepare() { Review Comment: not sure if that is the best name to describe what it does ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -223,23 +232,54 @@ private void process(final TopicMetadataApplicationEvent event) { event.chain(future); } +private void processPrepClosingEvent(ConsumerCloseApplicationEvent event) { +switch (event.task()) { +case COMMIT: +log.debug("Sending unsent commit before closing."); +sendUnsentCommit(); +event.future().complete(null); Review Comment: Yeah, as long as our timeout did not expire, we probably want to wait for the response, right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) { } void cleanup() { +log.trace("Closing the consumer network thread"); +Timer timer = time.timer(closeTimeout); try { -log.trace("Closing the consumer network thread"); -Timer timer = time.timer(closeTimeout); -maybeAutocommitOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); -maybeLeaveGroup(timer); }
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
kirktrue commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418183946 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) { } void cleanup() { +log.trace("Closing the consumer network thread"); +Timer timer = time.timer(closeTimeout); try { -log.trace("Closing the consumer network thread"); -Timer timer = time.timer(closeTimeout); -maybeAutocommitOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); -maybeLeaveGroup(timer); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { +networkClientDelegate.awaitPendingRequests(timer); Review Comment: Network requests are tied to the `CompletableApplicationEvent`s, right? Can we just rely on the events to wait for their network I/O to complete via the `addAndGet()` method.? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class ConsumerCloseApplicationEvent extends CompletableApplicationEvent { Review Comment: Why not just have separate event types as per the rest of the codebase? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class ConsumerCloseApplicationEvent extends CompletableApplicationEvent { Review Comment: I'm happy to have a superclass for 'close' events, but having a type and a task gets a bit muddy, doesn't it? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -45,15 +46,19 @@ public class ApplicationEventProcessor extends EventProcessor private final Logger log; private final ConsumerMetadata metadata; private final RequestManagers requestManagers; +private final NetworkClientDelegate networkClientDelegate; Review Comment: I'm uncomfortable with introducing the `NetworkClientDelegate` at this layer. It's centralized in `ConsumerNetworkThread` for the reason that we can reason on where the various network I/O is performed. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -113,6 +118,10 @@ public void process(ApplicationEvent event) { process((UnsubscribeApplicationEvent) event); return; +case PREP_CLOSING: +processPrepClosingEvent((ConsumerCloseApplicationEvent) event); +return; + Review Comment: Any reason we can't have these as separate types like the other events? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -223,23 +232,54 @@ private void process(final TopicMetadataApplicationEvent event) { event.chain(future); }
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418129061 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) { } void cleanup() { Review Comment: really not much to do when shutting down the network thread - we will try one more time to send the unsent and poll the network client to make sure all requests and sent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418128177 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1024,15 +1037,14 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - +// Prepare shutting down the network thread +prepareShutdown(closeTimer, firstException); Review Comment: Completing the tasks before shutting down the network thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1843871662 Hi @kirktrue - I rewrote the previous PR based on your feedback. I thought driving the close via event is a better and clearer pattern, so thanks for the suggestions. Would you have time to take a look at this PR? @lucasbru - Thanks for reviewing the PR - I've decided according to your suggestion to use Kirk's approach to close the consumer. Let me know what do you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org