kirktrue commented on code in PR #17150:
URL: https://github.com/apache/kafka/pull/17150#discussion_r1823388056


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -508,7 +505,7 @@ public void 
testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges() {
                 mock(ConsumerRebalanceListenerInvoker.class),
                 subscriptions,
                 "group-id",
-                "client-id");
+                "client-id", false);

Review Comment:
   Can you move the new `false` parameter to be aligned with the other 
parameters?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -899,7 +893,7 @@ public void testAutoCommitSyncDisabled() {
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
             "group-id",
-            "client-id");
+            "client-id", false);

Review Comment:
   Same alignment issue here...



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1300,13 +1297,12 @@ private void releaseAssignmentAndLeaveGroup(final Timer 
timer) {
 
     // Visible for testing
     void commitSyncAllConsumed(final Timer timer) {
-        Map<TopicPartition, OffsetAndMetadata> allConsumed = 
subscriptions.allConsumed();
-        log.debug("Sending synchronous auto-commit of offsets {} on closing", 
allConsumed);
+        log.debug("Sending synchronous auto-commit on closing");
         try {
-            commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
+            commitSync(Duration.ofMillis(timer.remainingMs()));
         } catch (Exception e) {
             // consistent with async auto-commit failures, we do not propagate 
the exception
-            log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+            log.warn("Synchronous auto-commit failed: {}", e.getMessage());

Review Comment:
   This is unrelated to this PR, but should we log the stack trace of the 
error? In `commitAsync()` we log the stack trace, not just the message.
   
   ```suggestion
               log.warn("Synchronous auto-commit failed", e);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -187,22 +188,62 @@ private void process(final CreateFetchRequestsEvent 
event) {
 
     private void process(final AsyncCommitEvent event) {
         if (!requestManagers.commitRequestManager.isPresent()) {
+            event.future().complete(null);
             return;
         }
 
-        CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> future = manager.commitAsync(event.offsets());
-        future.whenComplete(complete(event.future()));
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
event.offsets().isPresent() ?
+                event.offsets().get() : subscriptions.allConsumed();

Review Comment:
   Minor, but in my opinion this is a little clearer:
   
   ```suggestion
           Map<TopicPartition, OffsetAndMetadata> offsets = 
event.offsets().orElseGet(subscriptions::allConsumed);
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -846,7 +836,7 @@ public void testUnsubscribeOnClose() {
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
             "group-id",
-            "client-id"));
+            "client-id", false));

Review Comment:
   Same alignment issue here...



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -187,22 +188,62 @@ private void process(final CreateFetchRequestsEvent 
event) {
 
     private void process(final AsyncCommitEvent event) {

Review Comment:
   Can we move the bulk of this logic into a new `CommitRequestManager` method? 
We try to keep the `process()` methods lightweight.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -881,13 +871,17 @@ public void testAutoCommitSyncEnabled() {
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
             "group-id",
-            "client-id");
+            "client-id", false);

Review Comment:
   Same alignment issue here...



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1700,7 +1690,7 @@ public void testEnsurePollEventSentOnConsumerPoll() {
                 mock(ConsumerRebalanceListenerInvoker.class),
                 subscriptions,
                 "group-id",
-                "client-id");
+                "client-id", false);

Review Comment:
   Same alignment issue here...



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -863,7 +853,7 @@ public void testFailedPartitionRevocationOnClose() {
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
             "group-id",
-            "client-id"));
+            "client-id", false));

Review Comment:
   Same alignment issue here...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to