lianetm commented on code in PR #17150:
URL: https://github.com/apache/kafka/pull/17150#discussion_r1828337700


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -198,23 +198,47 @@ private void process(final CreateFetchRequestsEvent 
event) {
     }
 
     private void process(final AsyncCommitEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
+        if (requestManagers.commitRequestManager.isEmpty()) {
+            event.future().complete(Map.of());
             return;
         }
 
-        CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> future = manager.commitAsync(event.offsets());
-        future.whenComplete(complete(event.future()));
+        try {
+            CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitAsync(event.offsets());
+            future.whenComplete((offsets, throwable) -> {
+                if (throwable != null) {
+                    log.error("Committing offsets failed", throwable);
+                    event.future().completeExceptionally(throwable);
+                } else {
+                    event.future().complete(offsets);
+                }
+            });
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
     }
 
     private void process(final SyncCommitEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
+        if (requestManagers.commitRequestManager.isEmpty()) {
+            event.future().complete(Map.of());
             return;
         }
 
-        CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> future = manager.commitSync(event.offsets(), 
event.deadlineMs());
-        future.whenComplete(complete(event.future()));
+        try {
+            CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitSync(event.offsets(), event.deadlineMs());
+            future.whenComplete((offsets, throwable) -> {
+                if (throwable != null) {
+                    log.error("Committing offsets failed", throwable);
+                    event.future().completeExceptionally(throwable);
+                } else {
+                    event.future().complete(offsets);
+                }
+            });

Review Comment:
   could we simplify this to `future.whenComplete(complete(event.future()))` ?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -198,23 +198,47 @@ private void process(final CreateFetchRequestsEvent 
event) {
     }
 
     private void process(final AsyncCommitEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
+        if (requestManagers.commitRequestManager.isEmpty()) {
+            event.future().complete(Map.of());
             return;
         }
 
-        CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> future = manager.commitAsync(event.offsets());
-        future.whenComplete(complete(event.future()));
+        try {
+            CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitAsync(event.offsets());
+            future.whenComplete((offsets, throwable) -> {
+                if (throwable != null) {
+                    log.error("Committing offsets failed", throwable);
+                    event.future().completeExceptionally(throwable);
+                } else {
+                    event.future().complete(offsets);
+                }
+            });
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
     }
 
     private void process(final SyncCommitEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
+        if (requestManagers.commitRequestManager.isEmpty()) {
+            event.future().complete(Map.of());

Review Comment:
   completeExceptionally?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java:
##########
@@ -20,13 +20,15 @@
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Event to commit offsets without waiting for a response, so the request 
won't be retried.
+ * If no offsets are provided, this event will commit all offsets.

Review Comment:
   nit: all **consumed** offsets



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -274,13 +278,146 @@ public void testPollEnsureEmptyPendingRequestAfterPoll() 
{
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
-        commitRequestManager.commitAsync(offsets);
+        commitRequestManager.commitAsync(Optional.of(offsets));
         assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
         assertEquals(1, 
commitRequestManager.poll(time.milliseconds()).unsentRequests.size());
         
assertTrue(commitRequestManager.unsentOffsetCommitRequests().isEmpty());
         assertEmptyPendingRequests(commitRequestManager);
     }
 
+    @Test
+    public void testCommitSync() {
+        subscriptionState = mock(SubscriptionState.class);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        TopicPartition tp = new TopicPartition("topic", 1);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
+        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
+
+        CommitRequestManager commitRequestManager = create(true, 100);

Review Comment:
   we don't need autoCommit enabled for this test, so even if it may be 
harmless maybe better to not mix it in to avoid confusion?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -187,22 +188,62 @@ private void process(final CreateFetchRequestsEvent 
event) {
 
     private void process(final AsyncCommitEvent event) {

Review Comment:
   I see, and I would say it's fine if we move it to the `commitMgr` as you 
did, as long as we update metadata before completing the result futures that 
the mgr returns 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1676,7 +1666,8 @@ public void testEnsurePollEventSentOnConsumerPoll() {
                 mock(ConsumerRebalanceListenerInvoker.class),
                 subscriptions,
                 "group-id",
-                "client-id");
+                "client-id",
+            false);

Review Comment:
   is indentation off here? (actually I think the "false" is the one that's ok, 
with 4 spaces, vs all the others with 8. I think we usually do 4 here)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -198,23 +198,47 @@ private void process(final CreateFetchRequestsEvent 
event) {
     }
 
     private void process(final AsyncCommitEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
+        if (requestManagers.commitRequestManager.isEmpty()) {
+            event.future().complete(Map.of());

Review Comment:
   Should we better complete exceptionally? I expect this would only happen 
during development phase if a bug is introduced, but still, it would be easier 
to catch if we make the app event fail because the expected manager is not 
present. I notice there's no consistency in how we handle these (some process 
calls complete exceptionally, others just return/complete, but I would lean 
towards not swallowing the unlikely/unexpected error here)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -596,8 +586,6 @@ public void testCommitAsyncLeaderEpochUpdate() {
         MockCommitCallback callback = new MockCommitCallback();
         assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, 
callback));
 
-        verify(metadata).updateLastSeenEpochIfNewer(t0, 2);

Review Comment:
   as above, agree with removing this as the `updateLastSeenEpochIfNewer` is 
now done by the `commitReqMgr`, but then this test looses it's value, we should 
remove it (coverage replaced in the CommitReqMgrTest)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java:
##########
@@ -20,14 +20,15 @@
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Event to commit offsets waiting for a response and retrying on expected 
retriable errors until
- * the timer expires.
+ * the timer expires. If no offsets are provided, this event will commit all 
offsets.

Review Comment:
   nit: ...all **consumed** offsets



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -198,23 +198,47 @@ private void process(final CreateFetchRequestsEvent 
event) {
     }
 
     private void process(final AsyncCommitEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
+        if (requestManagers.commitRequestManager.isEmpty()) {
+            event.future().complete(Map.of());
             return;
         }
 
-        CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> future = manager.commitAsync(event.offsets());
-        future.whenComplete(complete(event.future()));
+        try {
+            CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitAsync(event.offsets());
+            future.whenComplete((offsets, throwable) -> {
+                if (throwable != null) {
+                    log.error("Committing offsets failed", throwable);
+                    event.future().completeExceptionally(throwable);
+                } else {
+                    event.future().complete(offsets);
+                }
+            });

Review Comment:
   could we simplify this to `future.whenComplete(complete(event.future()))`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -225,7 +249,13 @@ private void process(final FetchCommittedOffsetsEvent 
event) {
         }
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.fetchOffsets(event.partitions(), event.deadlineMs());
-        future.whenComplete(complete(event.future()));
+        future.whenComplete((value, exception) -> {
+            if (exception != null)
+                event.future().completeExceptionally(exception);
+            else {
+                event.future().complete(value);
+            }
+        });

Review Comment:
   why not reusing `future.whenComplete(complete(event.future()))` like before?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -274,13 +278,146 @@ public void testPollEnsureEmptyPendingRequestAfterPoll() 
{
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
-        commitRequestManager.commitAsync(offsets);
+        commitRequestManager.commitAsync(Optional.of(offsets));
         assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
         assertEquals(1, 
commitRequestManager.poll(time.milliseconds()).unsentRequests.size());
         
assertTrue(commitRequestManager.unsentOffsetCommitRequests().isEmpty());
         assertEmptyPendingRequests(commitRequestManager);
     }
 
+    @Test
+    public void testCommitSync() {
+        subscriptionState = mock(SubscriptionState.class);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        TopicPartition tp = new TopicPartition("topic", 1);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
+        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
+
+        CommitRequestManager commitRequestManager = create(true, 100);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitSync(
+            Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs);
+        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
+        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
+        pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
+            "topic",
+            1,
+            (short) 1,
+            Errors.NONE)));
+
+        verify(subscriptionState, never()).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
+        assertTrue(future.isDone());
+        assertEquals(offsets, commitOffsets);
+    }
+
+    @Test
+    public void testCommitSyncWithEmptyOffsets() {
+        subscriptionState = mock(SubscriptionState.class);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        TopicPartition tp = new TopicPartition("topic", 1);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
+        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
+        doReturn(offsets).when(subscriptionState).allConsumed();
+
+        CommitRequestManager commitRequestManager = create(true, 100);

Review Comment:
   ditto



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -439,14 +450,15 @@ private OffsetCommitRequestState 
createOffsetCommitRequest(final Map<TopicPartit
     }
 
     private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
-                                       CompletableFuture<Void> result) {
+                                       CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> result) {
         pendingRequests.addOffsetCommitRequest(requestAttempt);
 
         // Retry the same commit request while it fails with 
RetriableException and the retry
         // timeout hasn't expired.
         requestAttempt.future.whenComplete((res, error) -> {
             if (error == null) {
-                result.complete(null);
+                result.complete(requestAttempt.offsets);
+                maybeUpdateLastSeenEpochIfNewer(requestAttempt.offsets);

Review Comment:
   ditto



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -381,20 +386,22 @@ private void 
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
      * exceptionally depending on the response. If the request fails with a 
retriable error, the
      * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Void> commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
-        if (offsets.isEmpty()) {
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(subscriptions::allConsumed);
+        if (commitOffsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
-            return CompletableFuture.completedFuture(null);
+            return CompletableFuture.completedFuture(Map.of());
         }
-        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Long.MAX_VALUE);
+        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(commitOffsets, Long.MAX_VALUE);
         pendingRequests.addOffsetCommitRequest(commitRequest);
 
-        CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
asyncCommitResult = new CompletableFuture<>();
         commitRequest.future.whenComplete((committedOffsets, error) -> {
             if (error != null) {
                 
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
             } else {
-                asyncCommitResult.complete(null);
+                asyncCommitResult.complete(commitOffsets);
+                maybeUpdateLastSeenEpochIfNewer(commitOffsets);

Review Comment:
   this should probably go before completing the "asyncCommitResult" future, to 
ensure that we don't signal the caller that the commit completed if we haven't 
updated the metadata yet.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -604,8 +605,6 @@ public void testCommitSyncLeaderEpochUpdate() {
 
         assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets));
 
-        verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
-        verify(metadata).updateLastSeenEpochIfNewer(t1, 1);

Review Comment:
   agree with removing the checks on metadata, but then I think we should 
remove the `testCommitSyncLeaderEpochUpdate` completely. The metadata update is 
not done in this component anymore, so this ends up simply testing commitSync, 
which is already covered. (coverage for the leader epoch update is now in the 
commitReqMgr, which does the metadata update) 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -532,6 +544,7 @@ private void fetchOffsetsWithRetries(final 
OffsetFetchRequestState fetchRequest,
             }
             if (error == null) {
                 result.complete(res);
+                maybeUpdateLastSeenEpochIfNewer(res);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to