Re: [PR] Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418122441 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean swallowException) { } } +/** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ +private void prepareShutdown(final Timer timer, final AtomicReference firstException) { +if (!groupMetadata.isPresent()) +return; +maybeAutoCommitSync(timer, firstException); +timer.update(); +waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, timer.remainingMs()), timer, firstException); +maybeInvokeCommitCallbacks(); +maybeRevokePartitions(timer, firstException); +waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, timer.remainingMs()), timer, firstException); +} + +private void waitOnEventCompletion(final ConsumerCloseApplicationEvent event, + final Timer timer, + final AtomicReference firstException) { +try { +applicationEventHandler.addAndGet(event, timer); +} catch (TimeoutException e) { Review Comment: We don't really throw timeout exceptions during closing because if user tries to close with 0 duration then all ops will be timedout. The current implementation just polls, but since we cannot directly polls the client, we need to either wait till the future is completed or times out and keep going. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1417650715 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -778,18 +783,23 @@ public void testGroupIdNull() { @Test public void testGroupIdNotNullAndValid() { +// close the default consumer +shutDown(); Review Comment: The test spins up another consumer so we should shutdown the BeforeEach one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1417650715 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -778,18 +783,23 @@ public void testGroupIdNull() { @Test public void testGroupIdNotNullAndValid() { +// close the default consumer +shutDown(); Review Comment: The test spins up another consumer so we should shutdown the @BeforeEach one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactor closing consumer [kafka]
lucasbru commented on PR #14937: URL: https://github.com/apache/kafka/pull/14937#issuecomment-1843148936 Yes, I think using events is much clearer. @kirktrue do you agree with this approach? Then I'd suggest we close the other PR and continue with this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactor closing consumer [kafka]
lucasbru commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1417532026 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -778,18 +783,23 @@ public void testGroupIdNull() { @Test public void testGroupIdNotNullAndValid() { +// close the default consumer +shutDown(); Review Comment: Isn't this anyways going to happen in `afterAll`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Refactor closing consumer [kafka]
philipnee opened a new pull request, #14937: URL: https://github.com/apache/kafka/pull/14937 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org