philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1419683219
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1050,6 +1060,85 @@ private void close(Duration timeout, boolean swallowException) { } } + /** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ + void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstException) { + if (!groupMetadata.isPresent()) + return; + maybeAutoCommitSync(autoCommitEnabled, timer, firstException); + timer.update(); + applicationEventHandler.add(new CommitOnCloseApplicationEvent()); + maybeRevokePartitions(timer, firstException); + waitOnCompletion( + () -> applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer), + "leave group on close", timer, firstException); + maybeInvokeCommitCallbacks(); + } + + // Visible for testing + void maybeRevokePartitions(final Timer timer, final AtomicReference<Throwable> firstException) { + if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) + return; + // TODO: We obviously needs to block until the partition revocation completes. + waitOnCompletion(this::invokePartitionRevocationListener, "revoke partitions", timer, firstException); + subscriptions.assignFromSubscribed(Collections.emptySet()); + } + + // Visible for testing + void maybeAutoCommitSync(final boolean shouldAutoCommit, + final Timer timer, + final AtomicReference<Throwable> firstException) { + if (!shouldAutoCommit) + return; + waitOnCompletion(() -> { + Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed(); + log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); + }, "autoCommitSync", timer, firstException); + } + + // Visible for testing + void waitOnCompletion(final Runnable function, + final String taskName, + final Timer timer, + final AtomicReference<Throwable> firstException) { + try { + function.run(); + } catch (Exception e) { + handleException(e, taskName, timer, Optional.of(firstException)); + } finally { + timer.update(); + } + } + + private void handleException(final Exception e, + final String taskName, + final Timer timer, + final Optional<AtomicReference<Throwable>> firstException) { + if (e instanceof TimeoutException) { + log.debug("Timeout of {}ms expired before the {} operation could complete.", timer.remainingMs(), taskName); + } else { + Exception exception = e; + if (e instanceof ExecutionException) + exception = (Exception) e.getCause(); + if (!firstException.isPresent()) + log.debug("Failed to execute {} operation due to {}", taskName, exception.getMessage()); + firstException.get().compareAndSet(null, exception); + } + } + + private CompletableFuture<Void> invokePartitionRevocationListener() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) + return CompletableFuture.completedFuture(null); + // TODO: Invoke rebalanceListener via KAFKA-15276 Review Comment: @kirktrue - I am not 100% sure what is the right way to invoke the listener. Are we returning a completable future? The current implementation blocks on listener invocation, which means where we need to do future.get(forever). If the listener is broken in some way, then we are stuck here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org