kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1559907295


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest {
     @AfterEach
     public void resetAll() {
         backgroundEventQueue.clear();
-        if (consumer != null) {
+        try {
             consumer.close(Duration.ZERO);
+        } catch (Exception e) {
+            // ignore

Review Comment:
   I'm a little leery about swallowing the exception here. Can we validate the 
exception type is something we expect? e.g.:
   
   ```suggestion
           } catch (Exception e) {
               assertInstanceOf(KafkaException.class, e);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1388,6 +1393,37 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
         }
     }
 
+    private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+        if (lastPendingAsyncCommit == null) {
+            return;
+        }
+
+        try {
+            CompletableFuture<Void> futureToAwait;
+            if (!disableWakeup) {
+                // We don't want the wake-up trigger to complete our pending 
async commit future,
+                // so create new future here.
+                futureToAwait = new CompletableFuture<>();
+                lastPendingAsyncCommit.whenComplete((v, t) -> {
+                    if (t != null) {
+                        futureToAwait.completeExceptionally(t);
+                    } else {
+                        futureToAwait.complete(v);
+                    }
+                });
+                wakeupTrigger.setActiveTask(futureToAwait);
+            } else {
+                futureToAwait = lastPendingAsyncCommit;
+            }
+            ConsumerUtils.getResult(futureToAwait, timer);
+            lastPendingAsyncCommit = null;
+        } finally {
+            if (!disableWakeup) wakeupTrigger.clearTask();
+            timer.update();
+        }

Review Comment:
   Do we want to clear out the `lastPendingAsyncCommit` in the `finally` block:
   
   ```suggestion
               ConsumerUtils.getResult(futureToAwait, timer);
           } finally {
               lastPendingAsyncCommit = null;
               if (!disableWakeup) wakeupTrigger.clearTask();
               timer.update();
           }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() {
         }
     }
 
-    private void maybeInvokeCommitCallbacks() {
-        offsetCommitCallbackInvoker.executeCallbacks();
-    }
-

Review Comment:
   Any reason we don't want to keep this method abstraction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to