Re: [PR] Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418122441


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer, final 
AtomicReference firstException) {
+if (!groupMetadata.isPresent())
+return;
+maybeAutoCommitSync(timer, firstException);
+timer.update();
+waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, 
timer.remainingMs()), timer, firstException);
+maybeInvokeCommitCallbacks();
+maybeRevokePartitions(timer, firstException);
+waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, 
timer.remainingMs()), timer, firstException);
+}
+
+private void waitOnEventCompletion(final ConsumerCloseApplicationEvent 
event,
+   final Timer timer,
+   final AtomicReference 
firstException) {
+try {
+applicationEventHandler.addAndGet(event, timer);
+} catch (TimeoutException e) {

Review Comment:
   We don't really throw timeout exceptions during closing because if user 
tries to close with 0 duration then all ops will be timedout.  The current 
implementation just polls, but since we cannot directly polls the client, we 
need to either wait till the future is completed or times out and keep going.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1417650715


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -778,18 +783,23 @@ public void testGroupIdNull() {
 
 @Test
 public void testGroupIdNotNullAndValid() {
+// close the default consumer
+shutDown();

Review Comment:
   The test spins up another consumer so we should shutdown the BeforeEach one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1417650715


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -778,18 +783,23 @@ public void testGroupIdNull() {
 
 @Test
 public void testGroupIdNotNullAndValid() {
+// close the default consumer
+shutDown();

Review Comment:
   The test spins up another consumer so we should shutdown the @BeforeEach 
one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


lucasbru commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1843148936

   Yes, I think using events is much clearer. @kirktrue do you agree with this 
approach? Then I'd suggest we close the other PR and continue with this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1417532026


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -778,18 +783,23 @@ public void testGroupIdNull() {
 
 @Test
 public void testGroupIdNotNullAndValid() {
+// close the default consumer
+shutDown();

Review Comment:
   Isn't this anyways going to happen in `afterAll`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee opened a new pull request, #14937:
URL: https://github.com/apache/kafka/pull/14937

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org