kirktrue commented on code in PR #14920: URL: https://github.com/apache/kafka/pull/14920#discussion_r1416404354
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -957,6 +966,57 @@ private void close(Duration timeout, boolean swallowException) { } } + /** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ + private void prepareShutdown(final Timer timer) { + if (!groupMetadata.isPresent()) + return; + + maybeAutoCommitSync(timer); + timer.update(); + if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) + return; + + try { + // If the consumer is in a group, we will pause and revoke all assigned partitions + onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS); + timer.update(); + } catch (Exception e) { + Exception exception = e; + if (e instanceof ExecutionException) + exception = (Exception) e.getCause(); + throw new KafkaException("User rebalance callback throws an error", exception); + } finally { + subscriptions.assignFromSubscribed(Collections.emptySet()); + } + } + + private void maybeAutoCommitSync(final Timer timer) { + if (autoCommitEnabled) { + Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed(); + try { + log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); + } catch (Exception e) { + // consistent with async auto-commit failures, we do not propagate the exception + log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); + } + } + } + + private CompletableFuture<Void> onLeavePrepare() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + // TODO: Invoke rebalanceListener via KAFKA-15276 + return CompletableFuture.completedFuture(null); + } + Review Comment: I would imagine that the leave group process could be mostly performed using an event and a callback execution, like in #14931. We'd need to submit an event to the background thread (e.g. `PrepareCloseEvent`) so that the member manager can orchestrate the callback request and the heartbeat request. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -931,15 +941,14 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - + // Prepare shutting down the network thread + swallow(log, Level.ERROR, "Unexpected exception when preparing for shutdown", () -> prepareShutdown(closeTimer), firstException); + closeTimer.update(); if (applicationEventHandler != null) - closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); - - // Invoke all callbacks after the background thread exists in case if there are unsent async - // commits - maybeInvokeCommitCallbacks(); - - closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); + closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); + closeTimer.update(); + // Ensure all async commit callbacks are invoked + swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback", this::maybeInvokeCommitCallbacks, firstException); Review Comment: These should be done before we shut down the network, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org