lianetm commented on code in PR #17150:
URL: https://github.com/apache/kafka/pull/17150#discussion_r1820586362


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -911,7 +911,7 @@ public Map<TopicPartition, OffsetAndMetadata> 
committed(final Set<TopicPartition
             wakeupTrigger.setActiveTask(event.future());
             try {
                 final Map<TopicPartition, OffsetAndMetadata> committedOffsets 
= applicationEventHandler.addAndGet(event);
-                committedOffsets.forEach(this::updateLastSeenEpochIfNewer);
+                ConsumerUtils.maybeUpdateLastSeenEpochIfNewer(metadata, 
committedOffsets);

Review Comment:
   The metadata updates have been progressively moved to the background. Should 
we do the last push here and move this call to 
`maybeUpdateLastSeenEpochIfNewer` logic to the background 
`ApplicationEventProcessor`? (it's called from there already for all commit 
events, if we move this and the call in seek to their events process call we 
would probably be done with it and have it consistently updated only in the 
background)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##########
@@ -258,4 +258,11 @@ public static KafkaException 
maybeWrapAsKafkaException(Throwable t, String messa
         else
             return new KafkaException(message, t);
     }
+
+    public static void maybeUpdateLastSeenEpochIfNewer(ConsumerMetadata 
metadata, final Map<TopicPartition, OffsetAndMetadata> offsets) {

Review Comment:
   Once we have all the calls to this metadata update consistently in the 
background, it will probably make sense to move this helper closer to 
AppEventProcessor where it's used?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -244,6 +244,130 @@ public void testSubscriptionChangeEvent() {
         verify(membershipManager, never()).onConsumerPoll();
     }
 
+    @Test
+    public void testSyncCommitEventWithOffsets() {
+        final long deadlineMs = 12345;
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets), 
deadlineMs);
+
+        setupProcessor(true);
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);

Review Comment:
   is this line needed? I would expect that we only care about verifying that 
the metadata was updated, done on ln 260 (no need to return any specific value)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -244,6 +244,130 @@ public void testSubscriptionChangeEvent() {
         verify(membershipManager, never()).onConsumerPoll();
     }
 
+    @Test
+    public void testSyncCommitEventWithOffsets() {
+        final long deadlineMs = 12345;
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets), 
deadlineMs);
+
+        setupProcessor(true);
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets,
 deadlineMs);
+
+        processor.process(event);
+        verify(subscriptionState, never()).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        verify(commitRequestManager).commitSync(offsets, deadlineMs);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(offsets, committedOffsets);
+    }
+
+    @Test
+    public void testSyncCommitEventWithEmptyOffsets() {
+        SyncCommitEvent event = new 
SyncCommitEvent(Optional.of(Collections.emptyMap()), 12345);
+
+        setupProcessor(true);
+
+        processor.process(event);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertTrue(committedOffsets.isEmpty());
+    }
+
+    @Test
+    public void testSyncCommitEventWithAllConsumed() {
+        final long deadlineMs = 12345;
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 
deadlineMs);
+
+        setupProcessor(true);
+        doReturn(offsets).when(subscriptionState).allConsumed();
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets,
 deadlineMs);
+
+        processor.process(event);
+        verify(subscriptionState).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        verify(commitRequestManager).commitSync(offsets, deadlineMs);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(offsets, committedOffsets);
+    }
+
+    @Test
+    public void testSyncCommitEventWithEmptyAllConsumed() {
+        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
+
+        setupProcessor(true);
+        doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed();
+
+        processor.process(event);
+        verify(subscriptionState).allConsumed();
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertTrue(committedOffsets.isEmpty());
+    }
+
+    @Test
+    public void testAsyncCommitEventWithOffsets() {
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        AsyncCommitEvent event = new AsyncCommitEvent(Optional.of(offsets));
+
+        setupProcessor(true);
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitAsync(offsets);
+
+        processor.process(event);
+        verify(subscriptionState, never()).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        verify(commitRequestManager).commitAsync(offsets);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(offsets, committedOffsets);
+    }
+
+    @Test
+    public void testAsyncCommitEventWithEmptyOffsets() {
+        AsyncCommitEvent event = new 
AsyncCommitEvent(Optional.of(Collections.emptyMap()));
+
+        setupProcessor(true);
+
+        processor.process(event);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertTrue(committedOffsets.isEmpty());
+    }
+
+    @Test
+    public void testAsyncCommitEventWithAllConsumed() {
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        AsyncCommitEvent event = new AsyncCommitEvent(Optional.empty());
+
+        setupProcessor(true);
+        doReturn(offsets).when(subscriptionState).allConsumed();
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);

Review Comment:
   needed?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -244,6 +244,130 @@ public void testSubscriptionChangeEvent() {
         verify(membershipManager, never()).onConsumerPoll();
     }
 
+    @Test
+    public void testSyncCommitEventWithOffsets() {
+        final long deadlineMs = 12345;
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets), 
deadlineMs);
+
+        setupProcessor(true);
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets,
 deadlineMs);
+
+        processor.process(event);
+        verify(subscriptionState, never()).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        verify(commitRequestManager).commitSync(offsets, deadlineMs);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(offsets, committedOffsets);
+    }
+
+    @Test
+    public void testSyncCommitEventWithEmptyOffsets() {
+        SyncCommitEvent event = new 
SyncCommitEvent(Optional.of(Collections.emptyMap()), 12345);
+
+        setupProcessor(true);
+
+        processor.process(event);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertTrue(committedOffsets.isEmpty());
+    }
+
+    @Test
+    public void testSyncCommitEventWithAllConsumed() {
+        final long deadlineMs = 12345;
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 
deadlineMs);
+
+        setupProcessor(true);
+        doReturn(offsets).when(subscriptionState).allConsumed();
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);

Review Comment:
   needed? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -743,18 +743,22 @@ public void commitAsync() {
 
     @Override
     public void commitAsync(OffsetCommitCallback callback) {
-        commitAsync(subscriptions.allConsumed(), callback);
+        commitAsync(Optional.empty(), callback);
     }
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback) {
+        commitAsync(Optional.of(offsets), callback);
+    }
+
+    private void commitAsync(Optional<Map<TopicPartition, OffsetAndMetadata>> 
offsets, OffsetCommitCallback callback) {
         acquireAndEnsureOpen();
         try {
             AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
-            lastPendingAsyncCommit = commit(asyncCommitEvent).whenComplete((r, 
t) -> {
+            lastPendingAsyncCommit = 
commit(asyncCommitEvent).whenComplete((committedOffsets, t) -> {

Review Comment:
   nit: should we rename 't' to error or something clearer? just like you did 
with 'r'



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java:
##########
@@ -20,14 +20,15 @@
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Event to commit offsets waiting for a response and retrying on expected 
retriable errors until
  * the timer expires.

Review Comment:
   maybe worth adding that if the event contains no offsets it will commit all 
consumed offsets retrieved from the subscription state, makes sense? 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -244,6 +244,130 @@ public void testSubscriptionChangeEvent() {
         verify(membershipManager, never()).onConsumerPoll();
     }
 
+    @Test
+    public void testSyncCommitEventWithOffsets() {
+        final long deadlineMs = 12345;
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets), 
deadlineMs);
+
+        setupProcessor(true);
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets,
 deadlineMs);
+
+        processor.process(event);
+        verify(subscriptionState, never()).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        verify(commitRequestManager).commitSync(offsets, deadlineMs);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(offsets, committedOffsets);
+    }
+
+    @Test
+    public void testSyncCommitEventWithEmptyOffsets() {
+        SyncCommitEvent event = new 
SyncCommitEvent(Optional.of(Collections.emptyMap()), 12345);
+
+        setupProcessor(true);
+
+        processor.process(event);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());

Review Comment:
   what about verifying that we don't generate requests for empty offsets?
   
   `verify(commitRequestManager, never()).commitSync(any(), anyLong());`
   
   (let's add it also to the other tests that play with the different commits 
and empty offsets)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -244,6 +244,130 @@ public void testSubscriptionChangeEvent() {
         verify(membershipManager, never()).onConsumerPoll();
     }
 
+    @Test
+    public void testSyncCommitEventWithOffsets() {
+        final long deadlineMs = 12345;
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets), 
deadlineMs);
+
+        setupProcessor(true);
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets,
 deadlineMs);
+
+        processor.process(event);
+        verify(subscriptionState, never()).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        verify(commitRequestManager).commitSync(offsets, deadlineMs);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(offsets, committedOffsets);
+    }
+
+    @Test
+    public void testSyncCommitEventWithEmptyOffsets() {
+        SyncCommitEvent event = new 
SyncCommitEvent(Optional.of(Collections.emptyMap()), 12345);
+
+        setupProcessor(true);
+
+        processor.process(event);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertTrue(committedOffsets.isEmpty());
+    }
+
+    @Test
+    public void testSyncCommitEventWithAllConsumed() {
+        final long deadlineMs = 12345;
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 
deadlineMs);
+
+        setupProcessor(true);
+        doReturn(offsets).when(subscriptionState).allConsumed();
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets,
 deadlineMs);
+
+        processor.process(event);
+        verify(subscriptionState).allConsumed();
+        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+        verify(commitRequestManager).commitSync(offsets, deadlineMs);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertEquals(offsets, committedOffsets);
+    }
+
+    @Test
+    public void testSyncCommitEventWithEmptyAllConsumed() {
+        SyncCommitEvent event = new SyncCommitEvent(Optional.empty(), 12345);
+
+        setupProcessor(true);
+        doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed();
+
+        processor.process(event);
+        verify(subscriptionState).allConsumed();
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
+        assertTrue(committedOffsets.isEmpty());
+    }
+
+    @Test
+    public void testAsyncCommitEventWithOffsets() {
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+        AsyncCommitEvent event = new AsyncCommitEvent(Optional.of(offsets));
+
+        setupProcessor(true);
+        doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);

Review Comment:
   needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to