philipnee commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1419683219


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1050,6 +1060,85 @@ private void close(Duration timeout, boolean 
swallowException) {
         }
     }
 
+    /**
+     * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+     * 1. autocommit offsets
+     * 2. revoke all partitions
+     */
+    void prepareShutdown(final Timer timer, final AtomicReference<Throwable> 
firstException) {
+        if (!groupMetadata.isPresent())
+            return;
+        maybeAutoCommitSync(autoCommitEnabled, timer, firstException);
+        timer.update();
+        applicationEventHandler.add(new CommitOnCloseApplicationEvent());
+        maybeRevokePartitions(timer, firstException);
+        waitOnCompletion(
+            () -> applicationEventHandler.addAndGet(new 
LeaveOnCloseApplicationEvent(), timer),
+            "leave group on close", timer, firstException);
+        maybeInvokeCommitCallbacks();
+    }
+
+    // Visible for testing
+    void maybeRevokePartitions(final Timer timer, final 
AtomicReference<Throwable> firstException) {
+        if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+            return;
+        // TODO: We obviously needs to block until the partition revocation 
completes.
+        waitOnCompletion(this::invokePartitionRevocationListener, "revoke 
partitions", timer, firstException);
+        subscriptions.assignFromSubscribed(Collections.emptySet());
+    }
+
+    // Visible for testing
+    void maybeAutoCommitSync(final boolean shouldAutoCommit,
+                             final Timer timer,
+                             final AtomicReference<Throwable> firstException) {
+        if (!shouldAutoCommit)
+            return;
+        waitOnCompletion(() -> {
+            Map<TopicPartition, OffsetAndMetadata> allConsumed = 
subscriptions.allConsumed();
+            log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+            commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
+        }, "autoCommitSync", timer, firstException);
+    }
+
+    // Visible for testing
+    void waitOnCompletion(final Runnable function,
+                                  final String taskName,
+                                  final Timer timer,
+                                  final AtomicReference<Throwable> 
firstException) {
+        try {
+            function.run();
+        } catch (Exception e) {
+            handleException(e, taskName, timer, Optional.of(firstException));
+        } finally {
+            timer.update();
+        }
+    }
+
+    private void handleException(final Exception e,
+                                 final String taskName,
+                                 final Timer timer,
+                                 final Optional<AtomicReference<Throwable>> 
firstException) {
+        if (e instanceof TimeoutException) {
+            log.debug("Timeout of {}ms expired before the {} operation could 
complete.", timer.remainingMs(), taskName);
+        } else {
+            Exception exception = e;
+            if (e instanceof ExecutionException)
+                exception = (Exception) e.getCause();
+            if (!firstException.isPresent())
+                log.debug("Failed to execute {} operation due to {}", 
taskName, exception.getMessage());
+            firstException.get().compareAndSet(null, exception);
+        }
+    }
+
+    private CompletableFuture<Void> invokePartitionRevocationListener() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+        if (!subscriptions.hasAutoAssignedPartitions() || 
droppedPartitions.isEmpty())
+            return CompletableFuture.completedFuture(null);
+        // TODO: Invoke rebalanceListener via KAFKA-15276

Review Comment:
   @kirktrue - I am not 100% sure what is the right way to invoke the listener. 
 Are we returning a completable future? The current implementation blocks on 
listener invocation, which means where we need to do future.get(forever).  If 
the listener is broken in some way, then we are stuck here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to