lucasbru commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1399134315


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -90,14 +93,13 @@ public void run() {
             while (running) {
                 try {
                     runOnce();
-                } catch (final WakeupException e) {
-                    log.debug("WakeupException caught, consumer network thread 
won't be interrupted");
+                } catch (final Throwable e) {
+                    log.error("Unexpected error caught in consumer network 
thread", e);
                     // swallow the wakeup exception to prevent killing the 
thread.

Review Comment:
   This comment is now outdated.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -257,10 +257,77 @@ private void closeInternal(final Duration timeout) {
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
+        waitForClosingTasks(timer);
         runAtClose(requestManagers.entries(), networkClientDelegate, timer);
         closeQuietly(requestManagers, "request managers");
         closeQuietly(networkClientDelegate, "network client delegate");
         closeQuietly(applicationEventProcessor, "application event processor");
         log.debug("Closed the consumer network thread");
     }
+
+    /**
+     * We need to autocommit before shutting down the consumer. The method 
needs to first connect to the coordinator
+     * node to construct the closing requests.  Then wait for all closing 
requests to finish before returning.  The
+     * method is bounded by a closing timer.  We will continue closing down 
the consumer if the requests cannot be
+     * completed in time.
+     */
+    // Visible for testing
+    void waitForClosingTasks(final Timer timer) {

Review Comment:
   Now I'm thinking this code could be moved to `runAtClose`? I guess what I'm 
wondering is, what goes into `runOnClose` and what goes into 
`waitForClosingTasks`...



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -180,12 +188,47 @@ public void maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets
         autocommit.setInflightCommitStatus(true);
     }
 
+    /**
+     * The consumer needs to send an auto commit during the shutdown if 
autocommit is enabled.
+     */
+    Optional<NetworkClientDelegate.UnsentRequest> maybeAutoCommit() {

Review Comment:
   maybe make clear from the method name that the request is not sent here 
(maybeCreateAutoCommitRequest)? Because the other `maybeAutoCommit` _does_ send 
the request.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -219,4 +219,14 @@ private void onResponse(
     public Optional<Node> coordinator() {
         return Optional.ofNullable(this.coordinator);
     }
+
+    @Override
+    public NetworkClientDelegate.PollResult pollOnClose() {

Review Comment:
   Why are you adding this? I see you already wrote custom code to find the 
coordinator in `ConsumerNetworkThread`, do I have to do it again during 
`runOnClose`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
+        coordinatorOnClose(timer);
         runAtClose(requestManagers.entries(), networkClientDelegate, timer);
         closeQuietly(requestManagers, "request managers");
         closeQuietly(networkClientDelegate, "network client delegate");
         closeQuietly(applicationEventProcessor, "application event processor");
         log.debug("Closed the consumer network thread");
     }
+
+    void coordinatorOnClose(final Timer timer) {
+        if (!requestManagers.coordinatorRequestManager.isPresent())
+            return;
+
+        connectCoordinator(timer);
+
+        List<NetworkClientDelegate.UnsentRequest> tasks = closingTasks();
+        do {
+            long currentTimeMs = timer.currentTimeMs();
+            connectCoordinator(timer);
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+        } while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+    }
+
+    private void connectCoordinator(final Timer timer) {
+        while (!coordinatorReady()) {
+            findCoordinatorSync(timer);
+        }
+    }
+
+    private boolean coordinatorReady() {
+        CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+        Optional<Node> coordinator = coordinatorRequestManager.coordinator();
+        return coordinator.isPresent() && 
!networkClientDelegate.isUnavailable(coordinator.get());
+    }
+
+    private void findCoordinatorSync(final Timer timer) {
+        CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+        long currentTimeMs = timer.currentTimeMs();
+        NetworkClientDelegate.PollResult request = 
coordinatorRequestManager.pollOnClose();
+        networkClientDelegate.addAll(request);
+        CompletableFuture<ClientResponse> findCoordinatorRequest = 
request.unsentRequests.get(0).future();
+        while (timer.notExpired() && !findCoordinatorRequest.isDone()) {
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+            timer.update();
+        }
+    }
+
+    private List<NetworkClientDelegate.UnsentRequest> maybeAutoCommitOnClose() 
{

Review Comment:
   But you are just moving the `isPresent` here (so you are not saving code or 
time), and have a function signature that tells you less about what is going on 
inside the function.
   
   btw, if you prefer you can do
   
```maybeAutoCommitOnClose().map(Collections::singletonList).orElseGet(Collections::emptyList);```
   in `closingTasks`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -180,12 +188,47 @@ public void maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets
         autocommit.setInflightCommitStatus(true);
     }
 
+    /**
+     * The consumer needs to send an auto commit during the shutdown if 
autocommit is enabled.
+     */
+    Optional<NetworkClientDelegate.UnsentRequest> maybeAutoCommit() {
+        if (!autoCommitState.isPresent()) {
+            return Optional.empty();
+        }
+
+        OffsetCommitRequestState request = 
pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter);
+        
request.future.whenComplete(autoCommitCallback(subscriptions.allConsumed()));
+        return Optional.of(request.toUnsentRequest());
+    }
+
+   // Visible for testing
+    CompletableFuture<Void> sendAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> allConsumedOffsets) {

Review Comment:
   visible for testing, but not used in testing



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
+        coordinatorOnClose(timer);
         runAtClose(requestManagers.entries(), networkClientDelegate, timer);
         closeQuietly(requestManagers, "request managers");
         closeQuietly(networkClientDelegate, "network client delegate");
         closeQuietly(applicationEventProcessor, "application event processor");
         log.debug("Closed the consumer network thread");
     }
+
+    void coordinatorOnClose(final Timer timer) {
+        if (!requestManagers.coordinatorRequestManager.isPresent())
+            return;
+
+        connectCoordinator(timer);

Review Comment:
   Got it, thanks. 
   
    By the way, can we use the name `ensureCoordinatorReady` or 
`ensureCoordinatorKnownAndReadySync` instead of `connectCoordinator`? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -90,14 +93,13 @@ public void run() {
             while (running) {
                 try {
                     runOnce();
-                } catch (final WakeupException e) {
-                    log.debug("WakeupException caught, consumer network thread 
won't be interrupted");
+                } catch (final Throwable e) {
+                    log.error("Unexpected error caught in consumer network 
thread", e);
                     // swallow the wakeup exception to prevent killing the 
thread.
                 }
             }
         } catch (final Throwable t) {

Review Comment:
   What are we even catching here? It seems like everything is already caught 
inside the loop.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java:
##########
@@ -35,7 +35,6 @@ public interface RequestManager {
      * synchronization protection in this method's implementation.
      *
      * <p/>
-     *

Review Comment:
   Let's not make random changes like this



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -257,10 +257,77 @@ private void closeInternal(final Duration timeout) {
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
+        waitForClosingTasks(timer);
         runAtClose(requestManagers.entries(), networkClientDelegate, timer);
         closeQuietly(requestManagers, "request managers");
         closeQuietly(networkClientDelegate, "network client delegate");
         closeQuietly(applicationEventProcessor, "application event processor");
         log.debug("Closed the consumer network thread");
     }
+
+    /**
+     * We need to autocommit before shutting down the consumer. The method 
needs to first connect to the coordinator
+     * node to construct the closing requests.  Then wait for all closing 
requests to finish before returning.  The
+     * method is bounded by a closing timer.  We will continue closing down 
the consumer if the requests cannot be
+     * completed in time.
+     */
+    // Visible for testing
+    void waitForClosingTasks(final Timer timer) {
+        if (!requestManagers.coordinatorRequestManager.isPresent())
+            return;
+
+        connectCoordinator(timer);
+        List<NetworkClientDelegate.UnsentRequest> tasks = closingTasks();
+        networkClientDelegate.addAll(tasks);
+        do {
+            long currentTimeMs = timer.currentTimeMs();
+            connectCoordinator(timer);
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+        } while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+    }
+
+    private void connectCoordinator(final Timer timer) {
+        while (!coordinatorReady()) {
+            findCoordinatorSync(timer);
+        }
+    }
+
+    private boolean coordinatorReady() {
+        CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+        Optional<Node> coordinator = coordinatorRequestManager.coordinator();
+        return coordinator.isPresent() && 
!networkClientDelegate.isUnavailable(coordinator.get());
+    }
+
+    private void findCoordinatorSync(final Timer timer) {
+        CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+        NetworkClientDelegate.PollResult request = 
coordinatorRequestManager.pollOnClose();
+        networkClientDelegate.addAll(request);
+        CompletableFuture<ClientResponse> findCoordinatorRequest = 
request.unsentRequests.get(0).future();
+        while (timer.notExpired() && !findCoordinatorRequest.isDone()) {
+            networkClientDelegate.poll(timer.remainingMs(), 
timer.currentTimeMs());
+            timer.update();
+        }
+    }
+
+    private List<NetworkClientDelegate.UnsentRequest> maybeAutoCommitOnClose() 
{
+        if (!requestManagers.commitRequestManager.isPresent()) {
+            return null;
+        }
+        Optional<NetworkClientDelegate.UnsentRequest> autocommit = 
requestManagers.commitRequestManager.get().maybeAutoCommit();
+        if (!autocommit.isPresent()) {
+            return Collections.emptyList();
+        }
+
+        return Collections.singletonList(autocommit.get());
+    }
+
+    /**
+     * Aggregate all requests that need to be sent before closing.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> closingTasks() {

Review Comment:
   You are using both `closingTasks` and `closingRequests` here, and I think I 
like the second better as it's more precise.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AutoCommitCompletionBackgroundEvent.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+public class AutoCommitCompletionBackgroundEvent extends BackgroundEvent {

Review Comment:
   Where are we handling this event?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to