Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-19 Thread via GitHub


lucasbru merged PR #14937:
URL: https://github.com/apache/kafka/pull/14937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-19 Thread via GitHub


lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1431330516


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -169,6 +184,35 @@ private AsyncKafkaConsumer 
newConsumer(ConsumerConfig config) {
 );
 }
 
+private AsyncKafkaConsumer newConsumer(
+FetchBuffer fetchBuffer,
+ConsumerInterceptors interceptors,
+ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
+SubscriptionState subscriptions,
+List assignors,
+String groupId,
+String clientId) {
+return new AsyncKafkaConsumer<>(

Review Comment:
   @philipnee I removed this on purpose from the test file to use the normal 
constructor and not just test a mock construction of the class. I'm sure there 
could have been a better way than to reintroduce the constructor that I remove. 
However, since we are otherwise not converging with this PR, I am going to 
merge this. Please consider following up with a PR that removes this 
constructor again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-18 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1862111535

   Hey @lucasbru - I think your patch fixed the long running/oom issue with the 
async consumer test. The tests finished within 3.5hr in this commit: 
[1786208](https://github.com/apache/kafka/pull/14937/commits/17862086b63864a8ad685f822e565da8626e8ea3)
However, there are still quite a few flaky tests in the 
PlaintextConsumerText. Namely..
   `testExpandingTopicSubscriptions`
   `testShrinkingTopicSubscriptions`
   `testFetchOutOfRangeOffsetResetConfigLatest`
   
   They are observed sparsely in other builds I've seen.  So I'm disabling them 
in the latest commit: 
[11a3ae6](https://github.com/apache/kafka/pull/14937/commits/11a3ae633ac551efcf342aa9cb32bd57a1b44314)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-18 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1862105547

   hey @lucasbru - Seems like your fix fixed the long running/oom issue with 
the test.  The `testExpandingTopicSubscriptions` is on the flaky side, I've 
observed that in other PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-17 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1859230526

   Thanks @lucasbru 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-17 Thread via GitHub


lucasbru commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1859087131

   @philipnee I think part of the reason why things are running particularly 
bad this weekend might be a change in transaction tests. I proposed a revert 
here. https://github.com/apache/kafka/pull/15029 I will retrigger the CI for 
this PR for now. Let's see if we get the revert-PR merged, then we can rebase 
this PR and hopefully get stable runs again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-17 Thread via GitHub


lucasbru commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1859081002

   @philipnee If tests are flaky in the PR that are also flaky on trunk, I 
don't think this is blocking the merge given the current state of the CI. The 
problem is that the builds fail completely, so there is no way to tell whether 
this PR introduces new problems or not. I think this may also be caused by 
other changes on trunk. For now, I think we have to restart CI and hope for the 
situation to improve, or debug the build failures ourselves. I looked into it 
briefly (checking the last few runs on trunk), and it seemed to me that a lot 
of failures of Gradle Executors followed the execution of 
`TransactionsWithTieredStoreTest`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-16 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1859051822

   hey @lucasbru - any idea on how to get a clean(-er) build? I think this is 
my 37th attempt to restart the build.  One cause is the trunk merged in some 
flaky tests which are addressed here: 
https://github.com/apache/kafka/pull/15025 , looking at the history, I think 
the trunk hasn't been very healthy for the past week.  I wonder if we should 
try to merge that PR first then perhaps this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-16 Thread via GitHub


philipnee closed pull request #14937: KAFKA-15696: Refactor closing consumer
URL: https://github.com/apache/kafka/pull/14937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-14 Thread via GitHub


philipnee closed pull request #14937: KAFKA-15696: Refactor closing consumer
URL: https://github.com/apache/kafka/pull/14937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-14 Thread via GitHub


philipnee closed pull request #14937: KAFKA-15696: Refactor closing consumer
URL: https://github.com/apache/kafka/pull/14937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-14 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1856643470

   reopen for testing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-14 Thread via GitHub


philipnee closed pull request #14937: KAFKA-15696: Refactor closing consumer
URL: https://github.com/apache/kafka/pull/14937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-14 Thread via GitHub


lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1426388734


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -810,89 +854,87 @@ public void 
testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId
 final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
 props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
 final ConsumerConfig config = new ConsumerConfig(props);
-try (final AsyncKafkaConsumer consumer =
-new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer())) {
-
-final ConsumerGroupMetadata groupMetadata = 
consumer.groupMetadata();
-
-assertEquals(groupId, groupMetadata.groupId());
-assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
-assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
-assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
-}
+final AsyncKafkaConsumer consumer =
+new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer());
+final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
+assertEquals(groupId, groupMetadata.groupId());
+assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
+assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
+assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
+consumer.close(Duration.ZERO);
 }
 
 @Test
 public void testGroupMetadataUpdateSingleCall() {
 final String groupId = "consumerGroupA";
 final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
 final LinkedBlockingQueue backgroundEventQueue = new 
LinkedBlockingQueue<>();
-try (final AsyncKafkaConsumer consumer =

Review Comment:
   I didn't mean to revert the changes, but discard them when we merge with the 
asyncKafkaConsumer refactoring. Is it possible these changes were required to 
not run OOM?
   
   One option would be to rebase this PR on the refactoring, which can possibly 
resolve these OOMs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-14 Thread via GitHub


lucasbru commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1855386057

   This looks quite suspect:
   
   ```
   Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.HashMap.newNode(HashMap.java:1774)
at java.util.HashMap.putVal(HashMap.java:632)
at java.util.HashMap.put(HashMap.java:613)
at java.util.HashSet.add(HashSet.java:220)
at java.util.AbstractCollection.addAll(AbstractCollection.java:344)
at 
net.bytebuddy.dynamic.scaffold.MethodGraph$Compiler$Default$Key$Harmonized.detach(MethodGraph.java:1020)
at 
net.bytebuddy.dynamic.scaffold.MethodGraph$Compiler$Default$Key$Store$Entry$Resolved.asNode(MethodGraph.java:1536)
at 
net.bytebuddy.dynamic.scaffold.MethodGraph$Compiler$Default$Key$Store.asGraph(MethodGraph.java:1252)
at 
net.bytebuddy.dynamic.scaffold.MethodGraph$Compiler$Default.compile(MethodGraph.java:646)
at 
net.bytebuddy.dynamic.scaffold.MethodGraph$Compiler$AbstractBase.compile(MethodGraph.java:519)
at 
org.mockito.internal.creation.bytebuddy.MockMethodAdvice.isOverridden(MockMethodAdvice.java:189)
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.commit(AsyncKafkaConsumer.java:641)
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumerTest.testCommitAsync_UserSuppliedCallback(AsyncKafkaConsumerTest.java:217)
   ```
   
   This is the only PR with lots of OOM errors right now, so I suspect there is 
something wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-13 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1855217598

   I rebased to see if that improves the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-13 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1854478484

   for `testFetcherConcurrency` I'm actually having a hard time to replicate 
the issue locally.  So let's see what the test suggests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-13 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1854460882

   Sorry about the exceedingly large log, but for documentation purposes: 
269/269 tests passed (so i enabled both generic and consumer protocol).
   ```
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testCoordinatorFailover(String, String) > 
testCoordinatorFailover(String, String).quorum=zk.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testCoordinatorFailover(String, String) > 
testCoordinatorFailover(String, String).quorum=kraft.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testCoordinatorFailover(String, String) > 
testCoordinatorFailover(String, 
String).quorum=kraft+kip848.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testClusterResourceListener(String, String) > 
testClusterResourceListener(String, String).quorum=zk.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testClusterResourceListener(String, String) > 
testClusterResourceListener(String, String).quorum=kraft.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testClusterResourceListener(String, String) > 
testClusterResourceListener(String, 
String).quorum=kraft+kip848.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testClusterResourceListener(String, String) > 
testClusterResourceListener(String, 
String).quorum=kraft+kip848.groupProtocol=consumer PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testSimpleConsumption(String, String) > 
testSimpleConsumption(String, String).quorum=zk.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testSimpleConsumption(String, String) > 
testSimpleConsumption(String, String).quorum=kraft.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testSimpleConsumption(String, String) > 
testSimpleConsumption(String, String).quorum=kraft+kip848.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testSimpleConsumption(String, String) > 
testSimpleConsumption(String, 
String).quorum=kraft+kip848.groupProtocol=consumer PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > 
testPartitionsForAutoCreate(String, String).quorum=zk.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > 
testPartitionsForAutoCreate(String, String).quorum=kraft.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testPartitionsForAutoCreate(String, String) > 
testPartitionsForAutoCreate(String, 
String).quorum=kraft+kip848.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > 
testShrinkingTopicSubscriptions(String, String).quorum=zk.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > 
testShrinkingTopicSubscriptions(String, 
String).quorum=kraft.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testShrinkingTopicSubscriptions(String, String) > 
testShrinkingTopicSubscriptions(String, 
String).quorum=kraft+kip848.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > 
testMaxPollIntervalMs(String, String).quorum=zk.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > 
testMaxPollIntervalMs(String, String).quorum=kraft.groupProtocol=generic PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testMaxPollIntervalMs(String, String) > 
testMaxPollIntervalMs(String, String).quorum=kraft+kip848.groupProtocol=generic 
PASSED
   Gradle Test Run :core:integrationTest > Gradle Test Executor 69 > 
PlaintextConsumerTest > testAssignAndConsumeFromCommittedOffsets(String, 
String) > testAssignAndConsumeFromCommittedOffsets(String, 

Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-13 Thread via GitHub


philipnee closed pull request #14937: KAFKA-15696: Refactor closing consumer
URL: https://github.com/apache/kafka/pull/14937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-13 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1854443779

   Let me close and reopen the ticket to trigger the test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-13 Thread via GitHub


lucasbru commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1854172469

   CI ran `testFetcherConcurrency` for > 1h on two pipelines and seems to OOM 
as well. 
   
   Can you check if this PR is in any way related?
   
   
https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-14937/runs/26/nodes/11/steps/87/log/?start=0
   
https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-14937/runs/26/nodes/9/steps/90/log/?start=0
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-13 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1854157002

   Sorry @lucasbru - forgot to post the integration results but I did.  Let me 
do that for the record.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-13 Thread via GitHub


lucasbru commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1853559479

   Restarting build as previous was aborted. @philipnee have you run the 
integration tests on this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424589872


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -285,44 +286,6 @@ void testEnsureEventsAreCompleted() {
 assertTrue(applicationEventsQueue.isEmpty());
 }
 
-@Test

Review Comment:
   removed because they are irrelevant now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424585777


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -810,89 +854,87 @@ public void 
testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId
 final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
 props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
 final ConsumerConfig config = new ConsumerConfig(props);
-try (final AsyncKafkaConsumer consumer =
-new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer())) {
-
-final ConsumerGroupMetadata groupMetadata = 
consumer.groupMetadata();
-
-assertEquals(groupId, groupMetadata.groupId());
-assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
-assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
-assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
-}
+final AsyncKafkaConsumer consumer =
+new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer());
+final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
+assertEquals(groupId, groupMetadata.groupId());
+assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
+assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
+assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
+consumer.close(Duration.ZERO);
 }
 
 @Test
 public void testGroupMetadataUpdateSingleCall() {
 final String groupId = "consumerGroupA";
 final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
 final LinkedBlockingQueue backgroundEventQueue = new 
LinkedBlockingQueue<>();
-try (final AsyncKafkaConsumer consumer =

Review Comment:
   ok makes sense, I can revert these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424433876


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer, final 
AtomicReference firstException) {
+if (!groupMetadata.isPresent())
+return;
+maybeAutoCommitSync(timer, firstException);
+timer.update();
+waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, 
timer.remainingMs()), timer, firstException);
+maybeInvokeCommitCallbacks();
+maybeRevokePartitions(timer, firstException);
+waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, 
timer.remainingMs()), timer, firstException);
+}
+
+private void waitOnEventCompletion(final ConsumerCloseApplicationEvent 
event,
+   final Timer timer,
+   final AtomicReference 
firstException) {
+try {
+applicationEventHandler.addAndGet(event, timer);
+} catch (TimeoutException e) {

Review Comment:
   @kirktrue and I discussed the potential tasks for dealing with zero timeout. 
 This needs to be examined perhaps after the preview. So we will spin off a 
jira ticket for this specific issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424414983


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ
final NetworkClientDelegate networkClientDelegate,
final Timer timer) {
 // These are the optional outgoing requests at the
-List pollResults = 
requestManagers.stream()
+requestManagers.stream()

Review Comment:
   This is not a conflict actually - this is just some changes to how fetch 
request manager closes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424413992


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1026,15 +1038,13 @@ private void close(Duration timeout, boolean 
swallowException) {
 final Timer closeTimer = time.timer(timeout);
 clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
 closeTimer.update();
-
+// Prepare shutting down the network thread
+prepareShutdown(closeTimer, firstException);
+closeTimer.update();
 if (applicationEventHandler != null)
-closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed to close application event handler with a timeout(ms)=" + 
closeTimer.remainingMs(), firstException);
-
-// Invoke all callbacks after the background thread exists in case if 
there are unsent async
-// commits
-maybeInvokeCommitCallbacks();
-
-closeQuietly(fetchBuffer, "Failed to close the fetch buffer", 
firstException);
+closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
+closeTimer.update();
+// Ensure all async commit callbacks are invoked

Review Comment:
   might not even need this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424029239


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1026,15 +1038,13 @@ private void close(Duration timeout, boolean 
swallowException) {
 final Timer closeTimer = time.timer(timeout);
 clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
 closeTimer.update();
-
+// Prepare shutting down the network thread
+prepareShutdown(closeTimer, firstException);
+closeTimer.update();
 if (applicationEventHandler != null)
-closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed to close application event handler with a timeout(ms)=" + 
closeTimer.remainingMs(), firstException);
-
-// Invoke all callbacks after the background thread exists in case if 
there are unsent async
-// commits
-maybeInvokeCommitCallbacks();
-
-closeQuietly(fetchBuffer, "Failed to close the fetch buffer", 
firstException);
+closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
+closeTimer.update();
+// Ensure all async commit callbacks are invoked

Review Comment:
   misplaced comment



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1050,6 +1060,85 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+void prepareShutdown(final Timer timer, final AtomicReference 
firstException) {
+if (!groupMetadata.isPresent())
+return;
+maybeAutoCommitSync(autoCommitEnabled, timer, firstException);
+timer.update();
+applicationEventHandler.add(new CommitOnCloseApplicationEvent());
+maybeRevokePartitions(timer, firstException);
+waitOnCompletion(
+() -> applicationEventHandler.addAndGet(new 
LeaveOnCloseApplicationEvent(), timer),
+"leave group on close", timer, firstException);
+maybeInvokeCommitCallbacks();
+}
+
+// Visible for testing
+void maybeRevokePartitions(final Timer timer, final 
AtomicReference firstException) {
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+// TODO: We obviously needs to block until the partition revocation 
completes.
+waitOnCompletion(this::invokePartitionRevocationListener, "revoke 
partitions", timer, firstException);
+subscriptions.assignFromSubscribed(Collections.emptySet());
+}
+
+// Visible for testing
+void maybeAutoCommitSync(final boolean shouldAutoCommit,
+ final Timer timer,
+ final AtomicReference firstException) {
+if (!shouldAutoCommit)
+return;
+waitOnCompletion(() -> {
+Map allConsumed = 
subscriptions.allConsumed();
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
+}, "autoCommitSync", timer, firstException);
+}
+
+// Visible for testing
+void waitOnCompletion(final Runnable function,
+  final String taskName,
+  final Timer timer,
+  final AtomicReference 
firstException) {
+try {
+function.run();
+} catch (Exception e) {
+handleException(e, taskName, timer, Optional.of(firstException));
+} finally {
+timer.update();
+}
+}
+
+private void handleException(final Exception e,
+ final String taskName,
+ final Timer timer,
+ final Optional> 
firstException) {
+if (e instanceof TimeoutException) {
+log.debug("Timeout of {}ms expired before the {} operation could 
complete.", timer.remainingMs(), taskName);
+} else {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+if (!firstException.isPresent())
+log.debug("Failed to execute {} operation due to {}", 
taskName, exception.getMessage());
+firstException.get().compareAndSet(null, exception);
+}
+}
+
+private CompletableFuture invokePartitionRevocationListener() {
+

Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-12 Thread via GitHub


lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1423701094


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -810,89 +854,87 @@ public void 
testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId
 final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
 props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
 final ConsumerConfig config = new ConsumerConfig(props);
-try (final AsyncKafkaConsumer consumer =
-new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer())) {
-
-final ConsumerGroupMetadata groupMetadata = 
consumer.groupMetadata();
-
-assertEquals(groupId, groupMetadata.groupId());
-assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
-assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
-assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
-}
+final AsyncKafkaConsumer consumer =
+new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer());
+final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
+assertEquals(groupId, groupMetadata.groupId());
+assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
+assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
+assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
+consumer.close(Duration.ZERO);
 }
 
 @Test
 public void testGroupMetadataUpdateSingleCall() {
 final String groupId = "consumerGroupA";
 final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
 final LinkedBlockingQueue backgroundEventQueue = new 
LinkedBlockingQueue<>();
-try (final AsyncKafkaConsumer consumer =

Review Comment:
   Please be aware that all these changes will be unnecessary/conflicting once 
we move to mocks, which is rather soon I believe, see:
   
   https://github.com/apache/kafka/pull/14930
   
   We will get lots of conflicts and I think whoever has to resolve the 
conflicts will probably just skip the changes from this PR, because the commit 
logic in the background thread will not be executed anymore in this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-10 Thread via GitHub


lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1421824192


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,80 +250,28 @@ private void closeInternal(final Duration timeout) {
 }
 }
 
+private void sendUnsentRequests(final Timer timer) {
+// Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+// all requests have received a response.
+while (!networkClientDelegate.unsentRequests().isEmpty() && 
timer.notExpired()) {

Review Comment:
   Well, if all requests are sent, I wouldn't timeout. But otherwise, yes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-08 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1420798994


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,80 +250,28 @@ private void closeInternal(final Duration timeout) {
 }
 }
 
+private void sendUnsentRequests(final Timer timer) {
+// Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+// all requests have received a response.
+while (!networkClientDelegate.unsentRequests().isEmpty() && 
timer.notExpired()) {

Review Comment:
   sorry, to reiterate on your comment, perhaps your suggestion is: if time has 
run out, we do client.poll(0) to try to send and process the request one last 
time. if the time hasn't run out and there are still request to be sent, we 
continue to poll until all requests are sent and timer runs out.  Is this what 
you meant?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-08 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1420786217


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ
final NetworkClientDelegate networkClientDelegate,
final Timer timer) {

Review Comment:
   noted, let's get rid of runAtClose all together in the future PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-08 Thread via GitHub


lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1420557352


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ
final NetworkClientDelegate networkClientDelegate,
final Timer timer) {

Review Comment:
   Seems timer is now completely unused.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -317,7 +326,7 @@ private void process(final GroupMetadataUpdateEvent event) {
 networkClientDelegateSupplier);
 final Supplier 
applicationEventProcessorSupplier = 
ApplicationEventProcessor.supplier(logContext,
 metadata,
-applicationEventQueue,
+applicationEventQueue,

Review Comment:
   indentation



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,80 +250,28 @@ private void closeInternal(final Duration timeout) {
 }
 }
 
+private void sendUnsentRequests(final Timer timer) {
+// Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+// all requests have received a response.
+while (!networkClientDelegate.unsentRequests().isEmpty() && 
timer.notExpired()) {

Review Comment:
   Closing with timeout 0 would mean we don't send any closing requests, right? 
I think we should `poll` nevertheless, so we should check the timer at the end.
   
   I think if we'd use the normal poll loop as long as `timeout > 0`, this 
function may not need to check the timer anyway, since it's only used if the 
time ran out and there are still unsent requests.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -469,7 +481,7 @@ private void process(final GroupMetadataUpdateEvent event) {
 Supplier applicationEventProcessorSupplier 
= ApplicationEventProcessor.supplier(
 logContext,
 metadata,
-applicationEventQueue,
+applicationEventQueue,

Review Comment:
   indentation



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##
@@ -82,6 +82,7 @@ public PollResult poll(long currentTimeMs) {
  */
 @Override
 public PollResult pollOnClose() {
+// TODO: move the logic to poll to handle signal close

Review Comment:
   Yes, using the normal poll loop sounds like a good idea. We should still 
probably  `sendUnsentRequests` once when the timeout has passed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418128177


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1024,15 +1037,14 @@ private void close(Duration timeout, boolean 
swallowException) {
 final Timer closeTimer = time.timer(timeout);
 clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
 closeTimer.update();
-
+// Prepare shutting down the network thread
+prepareShutdown(closeTimer, firstException);

Review Comment:
   Completing the tasks before shutting down the network thread. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1846153413

   Hi @kirktrue @lucasbru - Sorry about the huge PR. But I've addressed most of 
your comments if not all (apologize if not).  I've left comments on places 
where a bit more discussions are needed. LMK if you have any questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1419688648


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -315,7 +315,8 @@ public void testFetcherCloseClosesFetchSessionsInBroker() {
 // the close() method with a Timer will NOT send out the close session 
requests on close. The network
 // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we 
need to run that logic here.
 ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), 
networkClientDelegate, timer);
-
+// the network is polled during the last state of clean up.
+networkClientDelegate.poll(time.timer(1));

Review Comment:
   cc @kirktrue 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1419688146


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -810,89 +854,87 @@ public void 
testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId
 final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
 props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
 final ConsumerConfig config = new ConsumerConfig(props);
-try (final AsyncKafkaConsumer consumer =
-new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer())) {
-
-final ConsumerGroupMetadata groupMetadata = 
consumer.groupMetadata();
-
-assertEquals(groupId, groupMetadata.groupId());
-assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
-assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
-assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
-}
+final AsyncKafkaConsumer consumer =
+new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer());
+final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
+assertEquals(groupId, groupMetadata.groupId());
+assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
+assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
+assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
+consumer.close(Duration.ZERO);
 }
 
 @Test
 public void testGroupMetadataUpdateSingleCall() {
 final String groupId = "consumerGroupA";
 final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
 final LinkedBlockingQueue backgroundEventQueue = new 
LinkedBlockingQueue<>();
-try (final AsyncKafkaConsumer consumer =

Review Comment:
   removing the try to close the consumer with 0 timeout. @lucasbru 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1419685035


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##
@@ -82,6 +82,7 @@ public PollResult poll(long currentTimeMs) {
  */
 @Override
 public PollResult pollOnClose() {
+// TODO: move the logic to poll to handle signal close

Review Comment:
   I added a method signalClose() to the interface.  I wonder if we should keep 
letting the network thread poll the network client as usual, until we actually 
invoke close.  This means, close will do very little but check if there are any 
pending requests.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##
@@ -82,6 +82,7 @@ public PollResult poll(long currentTimeMs) {
  */
 @Override
 public PollResult pollOnClose() {
+// TODO: move the logic to poll to handle signal close

Review Comment:
   @kirktrue 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1419683491


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ
final NetworkClientDelegate networkClientDelegate,
final Timer timer) {
 // These are the optional outgoing requests at the
-List pollResults = 
requestManagers.stream()
+requestManagers.stream()

Review Comment:
   see the comment in the FetchRequestManager



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ
final NetworkClientDelegate networkClientDelegate,
final Timer timer) {
 // These are the optional outgoing requests at the
-List pollResults = 
requestManagers.stream()
+requestManagers.stream()

Review Comment:
   @kirktrue 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1419683219


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1050,6 +1060,85 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+void prepareShutdown(final Timer timer, final AtomicReference 
firstException) {
+if (!groupMetadata.isPresent())
+return;
+maybeAutoCommitSync(autoCommitEnabled, timer, firstException);
+timer.update();
+applicationEventHandler.add(new CommitOnCloseApplicationEvent());
+maybeRevokePartitions(timer, firstException);
+waitOnCompletion(
+() -> applicationEventHandler.addAndGet(new 
LeaveOnCloseApplicationEvent(), timer),
+"leave group on close", timer, firstException);
+maybeInvokeCommitCallbacks();
+}
+
+// Visible for testing
+void maybeRevokePartitions(final Timer timer, final 
AtomicReference firstException) {
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+// TODO: We obviously needs to block until the partition revocation 
completes.
+waitOnCompletion(this::invokePartitionRevocationListener, "revoke 
partitions", timer, firstException);
+subscriptions.assignFromSubscribed(Collections.emptySet());
+}
+
+// Visible for testing
+void maybeAutoCommitSync(final boolean shouldAutoCommit,
+ final Timer timer,
+ final AtomicReference firstException) {
+if (!shouldAutoCommit)
+return;
+waitOnCompletion(() -> {
+Map allConsumed = 
subscriptions.allConsumed();
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
+}, "autoCommitSync", timer, firstException);
+}
+
+// Visible for testing
+void waitOnCompletion(final Runnable function,
+  final String taskName,
+  final Timer timer,
+  final AtomicReference 
firstException) {
+try {
+function.run();
+} catch (Exception e) {
+handleException(e, taskName, timer, Optional.of(firstException));
+} finally {
+timer.update();
+}
+}
+
+private void handleException(final Exception e,
+ final String taskName,
+ final Timer timer,
+ final Optional> 
firstException) {
+if (e instanceof TimeoutException) {
+log.debug("Timeout of {}ms expired before the {} operation could 
complete.", timer.remainingMs(), taskName);
+} else {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+if (!firstException.isPresent())
+log.debug("Failed to execute {} operation due to {}", 
taskName, exception.getMessage());
+firstException.get().compareAndSet(null, exception);
+}
+}
+
+private CompletableFuture invokePartitionRevocationListener() {
+SortedSet droppedPartitions = new 
TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+if (!subscriptions.hasAutoAssignedPartitions() || 
droppedPartitions.isEmpty())
+return CompletableFuture.completedFuture(null);
+// TODO: Invoke rebalanceListener via KAFKA-15276

Review Comment:
   @kirktrue - I am not 100% sure what is the right way to invoke the listener. 
 Are we returning a completable future? The current implementation blocks on 
listener invocation, which means where we need to do future.get(forever).  If 
the listener is broken in some way, then we are stuck here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1419350301


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) {
 }
 
 void cleanup() {
+log.trace("Closing the consumer network thread");
+Timer timer = time.timer(closeTimeout);
 try {
-log.trace("Closing the consumer network thread");
-Timer timer = time.timer(closeTimeout);
-maybeAutocommitOnClose(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, 
timer);
-maybeLeaveGroup(timer);
 } catch (Exception e) {
 log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
 } finally {
+networkClientDelegate.awaitPendingRequests(timer);

Review Comment:
   I think we actually don't need this here because runAtClose is already 
closing after checking the code. if the timer runs out, then we don't need to 
poll again. if all request are completed before timer runs out, then we don't 
need to repoll again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1419350301


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) {
 }
 
 void cleanup() {
+log.trace("Closing the consumer network thread");
+Timer timer = time.timer(closeTimeout);
 try {
-log.trace("Closing the consumer network thread");
-Timer timer = time.timer(closeTimeout);
-maybeAutocommitOnClose(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, 
timer);
-maybeLeaveGroup(timer);
 } catch (Exception e) {
 log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
 } finally {
+networkClientDelegate.awaitPendingRequests(timer);

Review Comment:
   I think we actually don't need this here because runAtClose is already 
closing after checking the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1419350301


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) {
 }
 
 void cleanup() {
+log.trace("Closing the consumer network thread");
+Timer timer = time.timer(closeTimeout);
 try {
-log.trace("Closing the consumer network thread");
-Timer timer = time.timer(closeTimeout);
-maybeAutocommitOnClose(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, 
timer);
-maybeLeaveGroup(timer);
 } catch (Exception e) {
 log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
 } finally {
+networkClientDelegate.awaitPendingRequests(timer);

Review Comment:
   it seems like there's network poll in both runAtClose and here.  We really 
only need to poll it in a single place.  I wonder if we could just remove the 
polls in runAtClose() and try to send out the requests here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1419342099


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -194,11 +189,11 @@ static void runAtClose(final Collection> requ
 
 // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
 // all requests have received a response.
-do {
+while (timer.notExpired() && 
!requestFutures.stream().allMatch(Future::isDone)) {

Review Comment:
   if the close timer has expired, should we proceed with closing without 
sending the request? I'm undecided on this. @kirktrue wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-07 Thread via GitHub


lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418972399


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer, final 
AtomicReference firstException) {
+if (!groupMetadata.isPresent())
+return;
+maybeAutoCommitSync(timer, firstException);
+timer.update();
+waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, 
timer.remainingMs()), timer, firstException);
+maybeInvokeCommitCallbacks();
+maybeRevokePartitions(timer, firstException);
+waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, 
timer.remainingMs()), timer, firstException);
+}
+
+private void waitOnEventCompletion(final ConsumerCloseApplicationEvent 
event,
+   final Timer timer,
+   final AtomicReference 
firstException) {
+try {
+applicationEventHandler.addAndGet(event, timer);
+} catch (TimeoutException e) {
+log.debug("Timeout of {}ms expired before the {} operation could 
complete.",
+timer.remainingMs(),
+event.task());
+} catch (Exception e) {
+firstException.compareAndSet(null, e);
+} finally {
+timer.update();
+}
+}
+
+private void maybeRevokePartitions(final Timer timer, final 
AtomicReference firstException) {
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+try {
+// If the consumer is in a group, we will pause and revoke all 
assigned partitions
+onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+} catch (Exception e) {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+firstException.compareAndSet(null, exception);
+} finally {
+subscriptions.assignFromSubscribed(Collections.emptySet());
+timer.update();
+}
+}
+
+private void maybeAutoCommitSync(final Timer timer, final 
AtomicReference firstException) {
+if (autoCommitEnabled) {
+Map allConsumed = 
subscriptions.allConsumed();
+try {
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+} catch (TimeoutException e) {
+log.debug("Timeout of {}ms expired before the auto commit 
could complete.",
+timer.remainingMs());
+} catch (Exception e) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+firstException.compareAndSet(null, e);
+}
+}
+}
+
+private CompletableFuture onLeavePrepare() {

Review Comment:
   not sure if that is the best name to describe what it does



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -223,23 +232,54 @@ private void process(final TopicMetadataApplicationEvent 
event) {
 event.chain(future);
 }
 
+private void processPrepClosingEvent(ConsumerCloseApplicationEvent event) {
+switch (event.task()) {
+case COMMIT:
+log.debug("Sending unsent commit before closing.");
+sendUnsentCommit();
+event.future().complete(null);

Review Comment:
   Yeah, as long as our timeout did not expire, we probably want to wait for 
the response, right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) {
 }
 
 void cleanup() {
+log.trace("Closing the consumer network thread");
+Timer timer = time.timer(closeTimeout);
 try {
-log.trace("Closing the consumer network thread");
-Timer timer = time.timer(closeTimeout);
-maybeAutocommitOnClose(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, 
timer);
-maybeLeaveGroup(timer);
 } 

Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


kirktrue commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418183946


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) {
 }
 
 void cleanup() {
+log.trace("Closing the consumer network thread");
+Timer timer = time.timer(closeTimeout);
 try {
-log.trace("Closing the consumer network thread");
-Timer timer = time.timer(closeTimeout);
-maybeAutocommitOnClose(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, 
timer);
-maybeLeaveGroup(timer);
 } catch (Exception e) {
 log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
 } finally {
+networkClientDelegate.awaitPendingRequests(timer);

Review Comment:
   Network requests are tied to the `CompletableApplicationEvent`s, right? Can 
we just rely on the events to wait for their network I/O to complete via the 
`addAndGet()` method.?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+public class ConsumerCloseApplicationEvent extends 
CompletableApplicationEvent {

Review Comment:
   Why not just have separate event types as per the rest of the codebase?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+public class ConsumerCloseApplicationEvent extends 
CompletableApplicationEvent {

Review Comment:
   I'm happy to have a superclass for 'close' events, but having a type and a 
task gets a bit muddy, doesn't it?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -45,15 +46,19 @@ public class ApplicationEventProcessor extends 
EventProcessor
 private final Logger log;
 private final ConsumerMetadata metadata;
 private final RequestManagers requestManagers;
+private final NetworkClientDelegate networkClientDelegate;

Review Comment:
   I'm uncomfortable with introducing the `NetworkClientDelegate` at this 
layer. It's centralized in `ConsumerNetworkThread` for the reason that we can 
reason on where the various network I/O is performed.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -113,6 +118,10 @@ public void process(ApplicationEvent event) {
 process((UnsubscribeApplicationEvent) event);
 return;
 
+case PREP_CLOSING:
+processPrepClosingEvent((ConsumerCloseApplicationEvent) event);
+return;
+

Review Comment:
   Any reason we can't have these as separate types like the other events?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -223,23 +232,54 @@ private void process(final TopicMetadataApplicationEvent 
event) {
 event.chain(future);
 }
 

Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418129061


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) {
 }
 
 void cleanup() {

Review Comment:
   really not much to do when shutting down the network thread - we will try 
one more time to send the unsent and poll the network client to make sure all 
requests and sent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418128177


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1024,15 +1037,14 @@ private void close(Duration timeout, boolean 
swallowException) {
 final Timer closeTimer = time.timer(timeout);
 clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
 closeTimer.update();
-
+// Prepare shutting down the network thread
+prepareShutdown(closeTimer, firstException);

Review Comment:
   Completing the tasks before shutting down the network thread. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]

2023-12-06 Thread via GitHub


philipnee commented on PR #14937:
URL: https://github.com/apache/kafka/pull/14937#issuecomment-1843871662

   Hi @kirktrue - I rewrote the previous PR based on your feedback.  I thought 
driving the close via event is a better and clearer pattern, so thanks for the 
suggestions.  Would you have time to take a look at this PR?
   
   @lucasbru - Thanks for reviewing the PR - I've decided according to your 
suggestion to use Kirk's approach to close the consumer.  Let me know what do 
you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org