lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1955228852


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -750,7 +750,7 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
             do {
 
                 // Make sure to let the background thread know that we are 
still polling.

Review Comment:
   Should we update this comment? Maybe worth calling out that this will 
trigger async auto-commits of the consumed positions if needed (on the interval 
and for revoking partitions)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -566,7 +563,7 @@ private boolean 
isStaleEpochErrorAndValidEpochAvailable(Throwable error) {
         return error instanceof StaleMemberEpochException && 
memberInfo.memberEpoch.isPresent();
     }
 
-    public void updateAutoCommitTimer(final long currentTimeMs) {
+    void updateAutoCommitTimer(final long currentTimeMs) {

Review Comment:
   then this could be just private too



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -264,7 +263,7 @@ private CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> requestAutoCom
      * In that case, the next auto-commit request will be sent on the next 
call to poll, after a
      * response for the in-flight is received.
      */
-    public void maybeAutoCommitAsync() {
+    void maybeAutoCommitAsync() {

Review Comment:
   since we added the new `updateTimerAndMaybeCommit` to encapsulate this, 
should this be private now? I checked and it's only used in this same class and 
the test (but I would say we should update the test to just use 
updateTimerAndMaybeCommit instead of updating the timer and calling the 
maybeAutoCommitAsync separately)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -637,6 +634,11 @@ private void maybeUpdateLastSeenEpochIfNewer(final 
Map<TopicPartition, OffsetAnd
         });
     }
 
+    public void updateTimerAndMaybeCommit(final long currentTimeMs) {

Review Comment:
   could we add the java doc? Mainly to point out that this is a non-blocking 
func, updates the timer and triggers the async auto-commit of all consumed is 
needed.  



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -320,27 +321,17 @@ public void testCommitSync() {
     public void testCommitSyncWithEmptyOffsets() {
         subscriptionState = mock(SubscriptionState.class);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
-        TopicPartition tp = new TopicPartition("topic", 1);
-        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
-        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
-        doReturn(offsets).when(subscriptionState).allConsumed();
 
         CommitRequestManager commitRequestManager = create(false, 100);
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitSync(
-            Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
-        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
-        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
-        pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
-            "topic",
-            1,
-            (short) 1,
-            Errors.NONE)));
+            Collections.emptyMap(), time.milliseconds() + defaultApiTimeoutMs);
+        assertEquals(0, 
commitRequestManager.unsentOffsetCommitRequests().size());
+        assertPoll(0, commitRequestManager);
 
-        verify(subscriptionState).allConsumed();
-        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
         Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
         assertTrue(future.isDone());
-        assertEquals(offsets, commitOffsets);

Review Comment:
   this was testing something different before (commit sync with empty offsets 
should use the allConsumed). That responsibility is not in this CommitMgr 
anymore, so ok to change this test, but we need to add coverage elsewhere (in 
the AppProcessorTest I expect, to test that process 
AsyncCommitEvent/SyncCommitEvent with empty offset retrieves the allConsumed 
and calls the commitMgr with them).
   
   Also, with this change, the `testCommitSyncWithEmptyAllConsumedOffsets` 
right below seems to be a duplicate now? 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -393,7 +383,7 @@ public void testCommitAsyncWithEmptyOffsets() {
         doReturn(offsets).when(subscriptionState).allConsumed();
 
         CommitRequestManager commitRequestManager = create(true, 100);
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Optional.empty());
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(offsets);

Review Comment:
   with this change I guess this test ends up being the same as the above one? 
we just need one, I would probably remove this one that deals with the 
subcsription state and that doesn't apply here anymore (and add the coverage in 
the AppEventProcessor for the case of allConsumed). 
   
   Also, the following `testCommitAsyncWithEmptyAllConsumedOffsets` should be 
renamed to `testCommitAsyncWithEmptyOffsets`.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -415,9 +404,8 @@ public void testCommitAsyncWithEmptyAllConsumedOffsets() {
         doReturn(Map.of()).when(subscriptionState).allConsumed();

Review Comment:
   I would expect we don't need this anymore?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -475,20 +477,24 @@ public void testSyncCommitEventWithException() {
         doReturn(future).when(commitRequestManager).commitSync(any(), 
anyLong());
         processor.process(event);
 
-        verify(commitRequestManager).commitSync(Optional.empty(), 12345);
+        verify(commitRequestManager).commitSync(Collections.emptyMap(), 12345);
+        assertTrue(event.offsetsReady.isDone());
         assertFutureThrows(event.future(), IllegalStateException.class);
     }
 
     @ParameterizedTest
     @MethodSource("offsetsGenerator")
     public void testAsyncCommitEventWithOffsets(Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets) {

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -446,14 +445,17 @@ public void 
testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() {
     @MethodSource("offsetsGenerator")
     public void testSyncCommitEvent(Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets) {

Review Comment:
   this needs to be extended (or split) to ensure that if offsets are empty, it 
will use the subscriptions.allConsumed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to