Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-29 Thread via GitHub


lucasbru merged PR #14842:
URL: https://github.com/apache/kafka/pull/14842


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-29 Thread via GitHub


lucasbru commented on PR #14842:
URL: https://github.com/apache/kafka/pull/14842#issuecomment-1831602145

   Test failures unrelated and mapped to existing/new flaky test issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-29 Thread via GitHub


cadonna commented on PR #14842:
URL: https://github.com/apache/kafka/pull/14842#issuecomment-1831462346

   I totally agree on not using spies. That is my opinion and also the Mockito 
documentation says to be really careful with spies and to just use them if 
absolutely needed. Spies are mostly used to test legacy code and they should 
not be used for new code. We should definitely not wrap the code to test into 
spies. In my current PR, a unit test was not working because of this wrapping.
   I started to improve the test code in a separate PR, but the PRs for the 
deadline have higher priority.   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-28 Thread via GitHub


philipnee commented on PR #14842:
URL: https://github.com/apache/kafka/pull/14842#issuecomment-1830970851

   Here are the failures:
   ```
   Build / JDK 21 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   42s
   Build / JDK 21 and Scala 2.13 / testResetSinkConnectorOffsetsZombieSinkTasks 
– org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   58s
   Build / JDK 21 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk 
– integration.kafka.server.FetchFromFollowerIntegrationTest
   6s
   Build / JDK 21 and Scala 2.13 / 
testRackAwareRangeAssignor(String).quorum=kraft – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   6s
   Build / JDK 21 and Scala 2.13 / testWithGroupMetadata() – 
kafka.api.TransactionsBounceTest
   13s
   Build / JDK 21 and Scala 2.13 / testWithGroupMetadata() – 
kafka.api.TransactionsBounceTest
   14s
   Build / JDK 21 and Scala 2.13 / testFailureToFenceEpoch(String).quorum=kraft 
– kafka.api.TransactionsTest
   42s
   Build / JDK 17 and Scala 2.13 / testWithGroupId() – 
kafka.api.TransactionsBounceTest
   12s
   Build / JDK 17 and Scala 2.13 / testWithGroupId() – 
kafka.api.TransactionsBounceTest
   9s
   Build / JDK 17 and Scala 2.13 / testWrongIncarnationId() – 
kafka.server.ControllerRegistrationManagerTest
   30s
   Build / JDK 17 and Scala 2.13 / 
testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft
 – kafka.server.DescribeClusterRequestTest
   <1s
   Build / JDK 17 and Scala 2.13 / 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).quorum=zk
 – org.apache.kafka.tools.TopicCommandIntegrationTest
   3s
   Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   42s
   Build / JDK 8 and Scala 2.12 / testAlterSinkConnectorOffsetsZombieSinkTasks 
– org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   24s
   Build / JDK 8 and Scala 2.12 / testWithGroupMetadata() – 
kafka.api.TransactionsBounceTest
   12s
   Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   15s
   Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   2m 51s
   Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest
   2m 28s
   Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   4m 26s
   Build / JDK 11 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy 
– org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   16s
   ```
   They are unrelated but seems like there's a movement of not merging failing 
builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-28 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1408238807


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -234,11 +235,20 @@ public void 
testAutocommit_ResendAutocommitAfterException() {
 
 @Test
 public void testAutocommit_EnsureOnlyOneInflightRequest() {
+TopicPartition t1p = new TopicPartition("topic1", 0);
+subscriptionState.assignFromUser(singleton(t1p));
+//subscriptionState.seekUnvalidated(t1p, new 
SubscriptionState.FetchPosition(100L));

Review Comment:
   oh yes... 🤦 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -234,11 +235,20 @@ public void 
testAutocommit_ResendAutocommitAfterException() {
 
 @Test
 public void testAutocommit_EnsureOnlyOneInflightRequest() {
+TopicPartition t1p = new TopicPartition("topic1", 0);
+subscriptionState.assignFromUser(singleton(t1p));
+//subscriptionState.seekUnvalidated(t1p, new 
SubscriptionState.FetchPosition(100L));

Review Comment:
   oh yes... 🤦 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-28 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1408236793


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -269,60 +275,56 @@ void cleanup() {
  * completed in time.
  */
 // Visible for testing
-void maybeAutoCommitAndLeaveGroup(final Timer timer) {
+void maybeAutocommitOnClose(final Timer timer) {
 if (!requestManagers.coordinatorRequestManager.isPresent())
 return;
 
+if (!requestManagers.commitRequestManager.isPresent()) {
+log.error("Expecting a CommitRequestManager but the object was 
never initialized. Shutting down.");
+return;
+}
+
+if (!requestManagers.commitRequestManager.get().canAutoCommit()) {
+return;
+}
+
 ensureCoordinatorReady(timer);
-List tasks = closingRequests();
-networkClientDelegate.addAll(tasks);
+List autocommitRequest =

Review Comment:
   Cannot agree more!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-28 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1408236102


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -210,17 +210,19 @@ public CompletableFuture 
maybeAutoCommitAllConsumed() {
 return maybeAutoCommit(subscriptions.allConsumed());
 }
 
+boolean canAutoCommit() {
+return autoCommitState.isPresent() && 
!subscriptions.allConsumed().isEmpty();
+}
+
 /**
- * The consumer needs to send an auto commit during the shutdown if 
autocommit is enabled.
+ * Returns an OffsetCommitRequest of all assigned topicPartitions and 
their current positions.
  */
-Optional 
maybeCreateAutoCommitRequest() {
-if (!autoCommitState.isPresent()) {
-return Optional.empty();
-}
-
-OffsetCommitRequestState request = 
pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter);
+NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() {

Review Comment:
   good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-28 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1408235006


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -184,7 +184,7 @@ private static long findMinTime(final Collection request
  * completed future if no request is generated.
  */
 public CompletableFuture maybeAutoCommit(final Map offsets) {
-if (!autoCommitState.isPresent()) {
+if (!canAutoCommit()) {

Review Comment:
   In fact, let me change the name of 
   ```
   public boolean canSendAutocommit() {
   return !this.hasInflightCommit && this.timer.isExpired();
   }
   ```
   to `shouldAutoCommit, because when the timer has expired, we should send the 
autocommit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-28 Thread via GitHub


lucasbru commented on PR #14842:
URL: https://github.com/apache/kafka/pull/14842#issuecomment-1829919586

   > Hi @lucasbru - I just opened the PR for you to review. I'm not 100% happy 
with the way tests are setup therefore I made some changes around optionally 
disabling autocommit in the network thread. Also, I feel the tests here kind of 
become some sort of integration testing. I thought it kind of against the unit 
test philosophy. 
   
   Yeah. I think the tests may be using `spy` too much, and should use `mock` 
instead. It's definitely running too much code, which makes it more like an 
integration test, but still introspect a lot, which make them more like a unit 
test. But it's not something that we are going to fix in this PR. I wonder if 
we can maybe create a ticket to migrate the `AsyncKafkaConsumerTest` to a more 
mocking based approach, is that something that would be feasible in a 
reasonable amount of time? cc @cadonna who always has strong opinions on unit 
testing.
   
   > 1. We will only auto commit if the configuration is enabled (by default) 
or if we've got anything to commit at all
   
   Makes sense
   
   > 2. We need to enforce finding a coordinator and send an autocommit 
regardless of the previous commit state because we need to make sure to record 
the progress before closing
   
   Makes sense
   
   > 3. Quite a bit of changes to the testing, because autocommit depends on 
the current progress, so I need to "seek" for some cases to ensure the test 
sends an autocommit
   
   Makes sense


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-28 Thread via GitHub


lucasbru commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1407524109


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -269,60 +275,56 @@ void cleanup() {
  * completed in time.
  */
 // Visible for testing
-void maybeAutoCommitAndLeaveGroup(final Timer timer) {
+void maybeAutocommitOnClose(final Timer timer) {
 if (!requestManagers.coordinatorRequestManager.isPresent())
 return;
 
+if (!requestManagers.commitRequestManager.isPresent()) {
+log.error("Expecting a CommitRequestManager but the object was 
never initialized. Shutting down.");
+return;
+}
+
+if (!requestManagers.commitRequestManager.get().canAutoCommit()) {
+return;
+}
+
 ensureCoordinatorReady(timer);
-List tasks = closingRequests();
-networkClientDelegate.addAll(tasks);
+List autocommitRequest =

Review Comment:
   Let's not use a list here, then we can drop the `get(0)` below. You can 
create the list in place in the line below or add a `NetworkClientDelegate.add` 
method.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -796,5 +823,45 @@ private HashMap 
mockTimestampToSearch() {
 timestampToSearch.put(t1, 2L);
 return timestampToSearch;
 }
+
+private void prepAutocommitOnClose() {
+Node node = testBuilder.metadata.fetch().nodes().get(0);
+
testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
 "group-id", node));
+if (!testBuilder.subscriptions.allConsumed().isEmpty()) {
+List topicPartitions = new 
ArrayList<>(testBuilder.subscriptions.assignedPartitionsList());
+testBuilder.client.prepareResponse(mockAutocommitResponse(
+topicPartitions,
+(short) 1,
+Errors.NONE).responseBody());
+}
+}
+private ClientResponse mockAutocommitResponse(final List 
topicPartitions,

Review Comment:
   nit: newline in between the methods



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -184,7 +184,7 @@ private static long findMinTime(final Collection request
  * completed future if no request is generated.
  */
 public CompletableFuture maybeAutoCommit(final Map offsets) {
-if (!autoCommitState.isPresent()) {
+if (!canAutoCommit()) {

Review Comment:
   maybe `shouldAutoCommit`? Because it's more about being obliged to commit 
and not about being able to commit? Also, try to be consistent with 
`AutoCommit` vs. `Autocommit`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -210,17 +210,19 @@ public CompletableFuture 
maybeAutoCommitAllConsumed() {
 return maybeAutoCommit(subscriptions.allConsumed());
 }
 
+boolean canAutoCommit() {
+return autoCommitState.isPresent() && 
!subscriptions.allConsumed().isEmpty();
+}
+
 /**
- * The consumer needs to send an auto commit during the shutdown if 
autocommit is enabled.
+ * Returns an OffsetCommitRequest of all assigned topicPartitions and 
their current positions.
  */
-Optional 
maybeCreateAutoCommitRequest() {
-if (!autoCommitState.isPresent()) {
-return Optional.empty();
-}
-
-OffsetCommitRequestState request = 
pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter);
+NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() {

Review Comment:
   Again, maybe make clear that this only creates the request. also, in other 
methods you use `AllConsumed` and drop the `Positions`, so maybe 
`createCommitAllConsumedRequest` ?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -234,11 +235,20 @@ public void 
testAutocommit_ResendAutocommitAfterException() {
 
 @Test
 public void testAutocommit_EnsureOnlyOneInflightRequest() {
+TopicPartition t1p = new TopicPartition("topic1", 0);
+subscriptionState.assignFromUser(singleton(t1p));
+//subscriptionState.seekUnvalidated(t1p, new 
SubscriptionState.FetchPosition(100L));

Review Comment:
   nit: maybe remove the commented line  if we do not need it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1407198957


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -269,60 +275,56 @@ void cleanup() {
  * completed in time.
  */
 // Visible for testing
-void maybeAutoCommitAndLeaveGroup(final Timer timer) {
+void maybeAutocommitOnClose(final Timer timer) {
 if (!requestManagers.coordinatorRequestManager.isPresent())
 return;
 
+if (!requestManagers.commitRequestManager.isPresent()) {
+log.error("Expecting a CommitRequestManager but the object was 
never initialized. Shutting down.");
+return;
+}
+
+if (!requestManagers.commitRequestManager.get().canAutoCommit()) {
+return;
+}
+
 ensureCoordinatorReady(timer);
-List tasks = closingRequests();
-networkClientDelegate.addAll(tasks);
+List autocommitRequest =
+
Collections.singletonList(requestManagers.commitRequestManager.get().commitAllConsumedPositions());
+networkClientDelegate.addAll(autocommitRequest);
 do {
 long currentTimeMs = timer.currentTimeMs();
 ensureCoordinatorReady(timer);
 networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
-} while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+} while (timer.notExpired() && 
!autocommitRequest.get(0).future().isDone());
+}
+
+void maybeLeaveGroup(final Timer timer) {
+// TODO: Leave group upon closing the consumer
 }
 
 private void ensureCoordinatorReady(final Timer timer) {
-while (!coordinatorReady()) {
+while (!coordinatorReady() && timer.notExpired()) {
 findCoordinatorSync(timer);
 }
 }
 
 private boolean coordinatorReady() {
-CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.orElseThrow(
+() -> new IllegalStateException("CoordinatorRequestManager 
uninitialized."));

Review Comment:
   Probably unnecessary - but adding this for a safety check wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1407198120


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -210,17 +210,19 @@ public CompletableFuture 
maybeAutoCommitAllConsumed() {
 return maybeAutoCommit(subscriptions.allConsumed());
 }
 
+boolean canAutoCommit() {

Review Comment:
   Unhappy with the naming, but can't seem to find a better way to restructure 
this at the moment.  Didn't like it because there is an infinite variation of 
canXxxx().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406787063


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -154,7 +154,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 if (!coordinatorRequestManager.coordinator().isPresent())
 return EMPTY;
 
-maybeAutoCommitAllConsumed();
+maybeAutoCommit();

Review Comment:
   we always commit allConsumed(), so there's no point to reinstate that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on PR #14842:
URL: https://github.com/apache/kafka/pull/14842#issuecomment-1828686697

   Hi @lucasbru - I just opened the PR for you to review.  I'm not 100% happy 
with the way tests are setup therefore I made some changes around optionally 
disabling autocommit in the network thread.  Also, I feel the tests here kind 
of become some sort of integration testing.  I thought it kind of against the 
unit test philosophy.  In summary, the changes are:
   1. We will only auto commit if the configuration is enabled (by default) or 
if we've got anything to commit at all
   2. We need to enforce finding a coordinator and send an autocommit 
regardless of the previous commit state because we need to make sure to record 
the progress before closing
   3. Quite a bit of changes to the testing, because autocommit depends on the 
current progress, so I need to "seek" for some cases to ensure the test sends 
an autocommit
   
   LMK what do you think!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406792003


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java:
##
@@ -279,7 +287,7 @@ public 
ConsumerNetworkThreadTestBuilder(Optional groupInfo) {
 
 @Override
 public void close() {
-closeQuietly(consumerNetworkThread, 
ConsumerNetworkThread.class.getSimpleName());
+consumerNetworkThread.close();

Review Comment:
   I don't think we should suppress the failures on closing during testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406789593


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -206,21 +206,21 @@ public CompletableFuture maybeAutoCommit(final 
Map maybeAutoCommitAllConsumed() {
+public CompletableFuture maybeAutoCommit() {
 return maybeAutoCommit(subscriptions.allConsumed());
 }
 
+boolean canAutoCommit() {
+return autoCommitState.isPresent() && 
!subscriptions.allConsumed().isEmpty();
+}
+
 /**
- * The consumer needs to send an auto commit during the shutdown if 
autocommit is enabled.
+ * Return an OffsetCommitRequest of all assigned topicPartitions and their 
current positions.
  */
-Optional 
maybeCreateAutoCommitRequest() {
-if (!autoCommitState.isPresent()) {
-return Optional.empty();
-}
-
+NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() {
 OffsetCommitRequestState request = 
pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter);
 
request.future.whenComplete(autoCommitCallback(subscriptions.allConsumed()));
-return Optional.of(request.toUnsentRequest());
+return request.toUnsentRequest();

Review Comment:
   We should always return a request because I moved that check out of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406788484


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -184,7 +184,7 @@ private static long findMinTime(final Collection request
  * completed future if no request is generated.
  */
 public CompletableFuture maybeAutoCommit(final Map offsets) {
-if (!autoCommitState.isPresent()) {
+if (!canAutoCommit()) {

Review Comment:
   pretty terrible naming because it kind of overlaps with the one below, not 
sure there's a better description for it.  It needs to check 1. if autocommit 
is enabled and 2. if there's anything to commit.  If neither, then we don't try 
to send a commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406787063


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -154,7 +154,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 if (!coordinatorRequestManager.coordinator().isPresent())
 return EMPTY;
 
-maybeAutoCommitAllConsumed();
+maybeAutoCommit();

Review Comment:
   we always commit allConsumed(), so there's no point to reinstate that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406788484


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -184,7 +184,7 @@ private static long findMinTime(final Collection request
  * completed future if no request is generated.
  */
 public CompletableFuture maybeAutoCommit(final Map offsets) {
-if (!autoCommitState.isPresent()) {
+if (!canAutoCommit()) {

Review Comment:
   pretty terrible words, not sure there's a better description for it.  It 
needs to check 1. if autocommit is enabled and 2. if there's anything to 
commit.  If neither, then we don't try to send a commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1406699542


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -269,22 +270,40 @@ void cleanup() {
  * completed in time.
  */
 // Visible for testing
-void maybeAutoCommitAndLeaveGroup(final Timer timer) {
+void maybeAutocommitOnClose(final Timer timer) {
 if (!requestManagers.coordinatorRequestManager.isPresent())
 return;
 
+if (!requestManagers.commitRequestManager.isPresent()) {
+log.error("Expecting a CommitRequestManager but the object was 
never initialized. Shutting down.");
+return;
+}
+
+if (!requestManagers.commitRequestManager.get().autoCommitEnabled()) {
+return;
+}
+
 ensureCoordinatorReady(timer);
-List tasks = closingRequests();
-networkClientDelegate.addAll(tasks);
+Optional autocommit = 
requestManagers.commitRequestManager.get().maybeCreateAutoCommitRequest();
+if (!autocommit.isPresent()) {
+return;
+}
+
+List autocommitRequest = 
Collections.singletonList(autocommit.get());
+networkClientDelegate.addAll(autocommitRequest);
 do {
 long currentTimeMs = timer.currentTimeMs();
 ensureCoordinatorReady(timer);
 networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
-} while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+} while (timer.notExpired() && 
!autocommitRequest.get(0).future().isDone());
+}
+
+void maybeLeaveGroup(final Timer timer) {
+// TODO: Leave group upon closing the consumer

Review Comment:
   Can I follow up with a subsequent ticket to address this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


lucasbru commented on PR #14842:
URL: https://github.com/apache/kafka/pull/14842#issuecomment-1827393124

   @philipnee Sounds good. Let me know when it's ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on PR #14842:
URL: https://github.com/apache/kafka/pull/14842#issuecomment-1827329339

   Hi @lucasbru - This PR should have addressed the issue in KAFKA-15887.  I 
also fixed a couple of issues while reviewing the code. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-27 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1405778947


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -218,6 +218,10 @@ Optional 
maybeCreateAutoCommitRequest() {
 return Optional.empty();
 }
 
+if (subscriptions.allConsumed().isEmpty()) {

Review Comment:
   For async request, we don't try to create one if there's no progress being 
made



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-26 Thread via GitHub


philipnee commented on code in PR #14842:
URL: https://github.com/apache/kafka/pull/14842#discussion_r1405771339


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -254,8 +255,9 @@ private void closeInternal(final Duration timeout) {
 void cleanup() {
 log.trace("Closing the consumer network thread");
 Timer timer = time.timer(closeTimeout);
-maybeAutoCommitAndLeaveGroup(timer);
+maybeAutocommitOnClose(timer);

Review Comment:
   After reviewing the ConsumerCoordinator, I split the original method into 
two functions: We should try to send an autocommit first and leave group last.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]

2023-11-26 Thread via GitHub


philipnee opened a new pull request, #14842:
URL: https://github.com/apache/kafka/pull/14842

   A few bugs was created from the previous issues. These are:
   1. During testing or some edge cases, the coordinator request manager might 
hold on to an inflight request forever. Therefore, when invoking 
coordinatorRequestManager.poll(), nothing would return.  Here we explicitly 
create a FindCoordinatorRequest regardless of the current request state because 
we want to actively search for a coordinator
   2. ensureCoordinatorReady() might be stuck in an infinite loop forever if 
the client fail to do so.  Even the consumer would be able to shutdown 
eventually, this is undesirable.
   3. The current asyncConsumerTest mixes background/network thread shutdown 
with the consumer shutdown.  As the goal of the module is unit testing, we 
should try to test the shutdown procedure separately.  Therefore, this PR adds 
a Mockito.doAnswer call to the applicationEventHandler.close().  Tests that are 
testing shutdown are calling shutdown() explicitly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org