showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r748720272



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -692,10 +692,25 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected void onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        // commit offsets prior to rebalance if auto-commit enabled
-        
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        boolean onJoinPrepareAsyncCommitCompleted = false;
+        // async commit offsets prior to rebalance if auto-commit enabled
+        // and if auto-commit disable or the coordinatorUnknown is true, the 
future will be null,
+        // the asynchronous commit operation will not do.

Review comment:
       // and if auto-commit disable or the coordinatorUnknown is true, the 
future will be 
   // the asynchronous commit operation will not do.
   
   --> // null future means no offset commit request sent, so it is still 
considered completed

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -692,10 +692,25 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected void onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        // commit offsets prior to rebalance if auto-commit enabled
-        
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        boolean onJoinPrepareAsyncCommitCompleted = false;
+        // async commit offsets prior to rebalance if auto-commit enabled
+        // and if auto-commit disable or the coordinatorUnknown is true, the 
future will be null,
+        // the asynchronous commit operation will not do.
+        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        if (future == null)
+            onJoinPrepareAsyncCommitCompleted = true;
+        else {
+            if (future.succeeded()) {
+                onJoinPrepareAsyncCommitCompleted = true;
+            } else if (future.failed()) {
+                // consistent with async auto-commit failures, we do not 
propagate the exception
+                log.warn("Asynchronous auto-commit offsets failed: {}", 
future.exception().getMessage());

Review comment:
       Sorry, I found we already log the error in `autoCommitOffsetsAsync`. We 
should remove the logging here. And make the if condition simpler as: 
   ```
   if (future == null || future.succeeded() || (future.failed() && 
!future.isRetriable())) {
       onJoinPrepareAsyncCommitCompleted = true;
   }

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -692,10 +692,25 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected void onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(int generation, String memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        // commit offsets prior to rebalance if auto-commit enabled
-        
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        boolean onJoinPrepareAsyncCommitCompleted = false;
+        // async commit offsets prior to rebalance if auto-commit enabled
+        // and if auto-commit disable or the coordinatorUnknown is true, the 
future will be null,
+        // the asynchronous commit operation will not do.
+        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        if (future == null)
+            onJoinPrepareAsyncCommitCompleted = true;
+        else {
+            if (future.succeeded()) {
+                onJoinPrepareAsyncCommitCompleted = true;
+            } else if (future.failed()) {
+                // consistent with async auto-commit failures, we do not 
propagate the exception
+                log.warn("Asynchronous auto-commit offsets failed: {}", 
future.exception().getMessage());

Review comment:
       Actually, after the concise if statement above, we can add comments like 
this:
   ```
   // return true when
   // 1. future is null, which means no commit request sent
   // 2. offset commit completed
   // 3. offset commit failed with non-retriable error
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1073,22 +1092,14 @@ private void doAutoCommitOffsetsAsync() {
         });
     }
 
-    private void maybeAutoCommitOffsetsSync(Timer timer) {
+    private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
         if (autoCommitEnabled) {
-            Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptions.allConsumed();
-            try {
-                log.debug("Sending synchronous auto-commit of offsets {}", 
allConsumedOffsets);
-                if (!commitOffsetsSync(allConsumedOffsets, timer))
-                    log.debug("Auto-commit of offsets {} timed out before 
completion", allConsumedOffsets);
-            } catch (WakeupException | InterruptException e) {
-                log.debug("Auto-commit of offsets {} was interrupted before 
completion", allConsumedOffsets);
-                // rethrow wakeups since they are triggered by the user
-                throw e;
-            } catch (Exception e) {
-                // consistent with async auto-commit failures, we do not 
propagate the exception
-                log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumedOffsets, e.getMessage());
-            }
-        }
+            RequestFuture<Void> future = autoCommitOffsetsAsync();
+            client.pollNoWakeup();
+            invokeCompletedOffsetCommitCallbacks();
+            return future;
+        } else
+            return null;

Review comment:
       nit: remove `else` here, return null at the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to