showuon commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r748720272
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -692,10 +692,25 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition } @Override - protected void onJoinPrepare(int generation, String memberId) { + protected boolean onJoinPrepare(int generation, String memberId) { log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId); - // commit offsets prior to rebalance if auto-commit enabled - maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)); + boolean onJoinPrepareAsyncCommitCompleted = false; + // async commit offsets prior to rebalance if auto-commit enabled + // and if auto-commit disable or the coordinatorUnknown is true, the future will be null, + // the asynchronous commit operation will not do. Review comment: // and if auto-commit disable or the coordinatorUnknown is true, the future will be // the asynchronous commit operation will not do. --> // null future means no offset commit request sent, so it is still considered completed ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -692,10 +692,25 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition } @Override - protected void onJoinPrepare(int generation, String memberId) { + protected boolean onJoinPrepare(int generation, String memberId) { log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId); - // commit offsets prior to rebalance if auto-commit enabled - maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)); + boolean onJoinPrepareAsyncCommitCompleted = false; + // async commit offsets prior to rebalance if auto-commit enabled + // and if auto-commit disable or the coordinatorUnknown is true, the future will be null, + // the asynchronous commit operation will not do. + RequestFuture<Void> future = maybeAutoCommitOffsetsAsync(); + if (future == null) + onJoinPrepareAsyncCommitCompleted = true; + else { + if (future.succeeded()) { + onJoinPrepareAsyncCommitCompleted = true; + } else if (future.failed()) { + // consistent with async auto-commit failures, we do not propagate the exception + log.warn("Asynchronous auto-commit offsets failed: {}", future.exception().getMessage()); Review comment: Sorry, I found we already log the error in `autoCommitOffsetsAsync`. We should remove the logging here. And make the if condition simpler as: ``` if (future == null || future.succeeded() || (future.failed() && !future.isRetriable())) { onJoinPrepareAsyncCommitCompleted = true; } ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -692,10 +692,25 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition } @Override - protected void onJoinPrepare(int generation, String memberId) { + protected boolean onJoinPrepare(int generation, String memberId) { log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId); - // commit offsets prior to rebalance if auto-commit enabled - maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)); + boolean onJoinPrepareAsyncCommitCompleted = false; + // async commit offsets prior to rebalance if auto-commit enabled + // and if auto-commit disable or the coordinatorUnknown is true, the future will be null, + // the asynchronous commit operation will not do. + RequestFuture<Void> future = maybeAutoCommitOffsetsAsync(); + if (future == null) + onJoinPrepareAsyncCommitCompleted = true; + else { + if (future.succeeded()) { + onJoinPrepareAsyncCommitCompleted = true; + } else if (future.failed()) { + // consistent with async auto-commit failures, we do not propagate the exception + log.warn("Asynchronous auto-commit offsets failed: {}", future.exception().getMessage()); Review comment: Actually, after the concise if statement above, we can add comments like this: ``` // return true when // 1. future is null, which means no commit request sent // 2. offset commit completed // 3. offset commit failed with non-retriable error ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -1073,22 +1092,14 @@ private void doAutoCommitOffsetsAsync() { }); } - private void maybeAutoCommitOffsetsSync(Timer timer) { + private RequestFuture<Void> maybeAutoCommitOffsetsAsync() { if (autoCommitEnabled) { - Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed(); - try { - log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets); - if (!commitOffsetsSync(allConsumedOffsets, timer)) - log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets); - } catch (WakeupException | InterruptException e) { - log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets); - // rethrow wakeups since they are triggered by the user - throw e; - } catch (Exception e) { - // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage()); - } - } + RequestFuture<Void> future = autoCommitOffsetsAsync(); + client.pollNoWakeup(); + invokeCompletedOffsetCommitCallbacks(); + return future; + } else + return null; Review comment: nit: remove `else` here, return null at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org