philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1150022881


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -61,73 +83,250 @@ public void setup() {
     }
 
     @Test
-    public void testPoll() {
+    public void testPoll_EnsureManualCommitSent() {
         CommitRequestManager commitRequestManger = create(false, 0);
-        NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
-        assertEquals(0, res.unsentRequests.size());
-        assertEquals(Long.MAX_VALUE, res.timeUntilNextPollMs);
+        assertPoll(0, commitRequestManger);
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManger.add(offsets);
-        res = commitRequestManger.poll(time.milliseconds());
-        assertEquals(1, res.unsentRequests.size());
+        commitRequestManger.addOffsetCommitRequest(offsets);
+        assertPoll(1, commitRequestManger);
     }
 
     @Test
-    public void testPollAndAutoCommit() {
+    public void testPoll_EnsureAutocommitSent() {
         CommitRequestManager commitRequestManger = create(true, 100);
-        NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
-        assertEquals(0, res.unsentRequests.size());
-        assertEquals(Long.MAX_VALUE, res.timeUntilNextPollMs);
+        assertPoll(0, commitRequestManger);
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManger.clientPoll(time.milliseconds());
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
         when(subscriptionState.allConsumed()).thenReturn(offsets);
         time.sleep(100);
-        commitRequestManger.clientPoll(time.milliseconds());
-        res = commitRequestManger.poll(time.milliseconds());
-        assertEquals(1, res.unsentRequests.size());
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        assertPoll(1, commitRequestManger);
     }
 
     @Test
-    public void testAutocommitStateUponFailure() {
+    public void testPoll_EnsureCorrectInflightRequestBufferSize() {
+        CommitRequestManager commitManager = create(false, 100);
+
+        // Create some offset commit requests
+        Map<TopicPartition, OffsetAndMetadata> offsets1 = new HashMap<>();
+        offsets1.put(new TopicPartition("test", 0), new 
OffsetAndMetadata(10L));
+        offsets1.put(new TopicPartition("test", 1), new 
OffsetAndMetadata(20L));
+        Map<TopicPartition, OffsetAndMetadata> offsets2 = new HashMap<>();
+        offsets2.put(new TopicPartition("test", 3), new 
OffsetAndMetadata(20L));
+        offsets2.put(new TopicPartition("test", 4), new 
OffsetAndMetadata(20L));
+
+        // Add the requests to the CommitRequestManager and store their futures
+        ArrayList<CompletableFuture<ClientResponse>> commitFutures = new 
ArrayList<>();
+        ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
fetchFutures = new ArrayList<>();
+        commitFutures.add(commitManager.addOffsetCommitRequest(offsets1));
+        
fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new 
TopicPartition("test", 0))));
+        commitFutures.add(commitManager.addOffsetCommitRequest(offsets2));
+        
fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new 
TopicPartition("test", 1))));
+
+        // Poll the CommitRequestManager and verify that the 
inflightOffsetFetches size is correct
+        NetworkClientDelegate.PollResult result = 
commitManager.poll(time.milliseconds());
+        assertEquals(4, result.unsentRequests.size());
+        assertTrue(result.unsentRequests
+                .stream().anyMatch(r -> r.requestBuilder() instanceof 
OffsetCommitRequest.Builder));
+        assertTrue(result.unsentRequests
+                .stream().anyMatch(r -> r.requestBuilder() instanceof 
OffsetFetchRequest.Builder));
+        assertFalse(commitManager.pendingRequests.hasUnsentRequests());
+        assertEquals(2, 
commitManager.pendingRequests.inflightOffsetFetches.size());
+
+        // Verify that the inflight offset fetch requests have been removed 
from the pending request buffer
+        commitFutures.forEach(f -> f.complete(null));
+        fetchFutures.forEach(f -> f.complete(null));
+        assertEquals(0, 
commitManager.pendingRequests.inflightOffsetFetches.size());
+    }
+
+    @Test
+    public void testPoll_EnsureEmptyPendingRequestAfterPoll() {
         CommitRequestManager commitRequestManger = create(true, 100);
-        time.sleep(100);
-        commitRequestManger.clientPoll(time.milliseconds());
-        NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
-        time.sleep(100);
-        // We want to make sure we don't resend autocommit if the previous 
request has not been completed
-        assertEquals(Long.MAX_VALUE, 
commitRequestManger.poll(time.milliseconds()).timeUntilNextPollMs);
+        commitRequestManger.addOffsetCommitRequest(new HashMap<>());
+        assertEquals(1, 
commitRequestManger.unsentOffsetCommitRequests().size());
+        commitRequestManger.poll(time.milliseconds());
+        assertTrue(commitRequestManger.unsentOffsetCommitRequests().isEmpty());
+        assertEmptyPendingRequests(commitRequestManger);
+    }
 
+    @Test
+    public void testAutocommit_ResendAutocommitAfterException() {
+        CommitRequestManager commitRequestManger = create(true, 100);
+        time.sleep(100);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        List<CompletableFuture<ClientResponse>> futures = assertPoll(1, 
commitRequestManger);
+        time.sleep(99);
         // complete the autocommit request (exceptionally)
-        res.unsentRequests.get(0).future().completeExceptionally(new 
KafkaException("test exception"));
+        futures.get(0).completeExceptionally(new KafkaException("test 
exception"));
 
         // we can then autocommit again
-        commitRequestManger.clientPoll(time.milliseconds());
-        res = commitRequestManger.poll(time.milliseconds());
-        assertEquals(1, res.unsentRequests.size());
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        assertPoll(0, commitRequestManger);
+        time.sleep(1);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        assertPoll(1, commitRequestManger);
+        assertEmptyPendingRequests(commitRequestManger);
     }
 
     @Test
-    public void testEnsureStagedCommitsPurgedAfterPoll() {
+    public void testAutocommit_EnsureOnlyOneInflightRequest() {
         CommitRequestManager commitRequestManger = create(true, 100);
-        commitRequestManger.add(new HashMap<>());
-        assertEquals(1, commitRequestManger.stagedCommits().size());
-        NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
-        assertTrue(commitRequestManger.stagedCommits().isEmpty());
+        time.sleep(100);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        List<CompletableFuture<ClientResponse>> futures = assertPoll(1, 
commitRequestManger);
+        time.sleep(100);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());

Review Comment:
   @guozhangwang - Fixed setting the inflight request in the `maybeAutoCommit` 
method. This test should address this issue/comment.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -61,73 +83,250 @@ public void setup() {
     }
 
     @Test
-    public void testPoll() {
+    public void testPoll_EnsureManualCommitSent() {
         CommitRequestManager commitRequestManger = create(false, 0);
-        NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
-        assertEquals(0, res.unsentRequests.size());
-        assertEquals(Long.MAX_VALUE, res.timeUntilNextPollMs);
+        assertPoll(0, commitRequestManger);
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManger.add(offsets);
-        res = commitRequestManger.poll(time.milliseconds());
-        assertEquals(1, res.unsentRequests.size());
+        commitRequestManger.addOffsetCommitRequest(offsets);
+        assertPoll(1, commitRequestManger);
     }
 
     @Test
-    public void testPollAndAutoCommit() {
+    public void testPoll_EnsureAutocommitSent() {
         CommitRequestManager commitRequestManger = create(true, 100);
-        NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
-        assertEquals(0, res.unsentRequests.size());
-        assertEquals(Long.MAX_VALUE, res.timeUntilNextPollMs);
+        assertPoll(0, commitRequestManger);
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManger.clientPoll(time.milliseconds());
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
         when(subscriptionState.allConsumed()).thenReturn(offsets);
         time.sleep(100);
-        commitRequestManger.clientPoll(time.milliseconds());
-        res = commitRequestManger.poll(time.milliseconds());
-        assertEquals(1, res.unsentRequests.size());
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        assertPoll(1, commitRequestManger);
     }
 
     @Test
-    public void testAutocommitStateUponFailure() {
+    public void testPoll_EnsureCorrectInflightRequestBufferSize() {
+        CommitRequestManager commitManager = create(false, 100);
+
+        // Create some offset commit requests
+        Map<TopicPartition, OffsetAndMetadata> offsets1 = new HashMap<>();
+        offsets1.put(new TopicPartition("test", 0), new 
OffsetAndMetadata(10L));
+        offsets1.put(new TopicPartition("test", 1), new 
OffsetAndMetadata(20L));
+        Map<TopicPartition, OffsetAndMetadata> offsets2 = new HashMap<>();
+        offsets2.put(new TopicPartition("test", 3), new 
OffsetAndMetadata(20L));
+        offsets2.put(new TopicPartition("test", 4), new 
OffsetAndMetadata(20L));
+
+        // Add the requests to the CommitRequestManager and store their futures
+        ArrayList<CompletableFuture<ClientResponse>> commitFutures = new 
ArrayList<>();
+        ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
fetchFutures = new ArrayList<>();
+        commitFutures.add(commitManager.addOffsetCommitRequest(offsets1));
+        
fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new 
TopicPartition("test", 0))));
+        commitFutures.add(commitManager.addOffsetCommitRequest(offsets2));
+        
fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new 
TopicPartition("test", 1))));
+
+        // Poll the CommitRequestManager and verify that the 
inflightOffsetFetches size is correct
+        NetworkClientDelegate.PollResult result = 
commitManager.poll(time.milliseconds());
+        assertEquals(4, result.unsentRequests.size());
+        assertTrue(result.unsentRequests
+                .stream().anyMatch(r -> r.requestBuilder() instanceof 
OffsetCommitRequest.Builder));
+        assertTrue(result.unsentRequests
+                .stream().anyMatch(r -> r.requestBuilder() instanceof 
OffsetFetchRequest.Builder));
+        assertFalse(commitManager.pendingRequests.hasUnsentRequests());
+        assertEquals(2, 
commitManager.pendingRequests.inflightOffsetFetches.size());
+
+        // Verify that the inflight offset fetch requests have been removed 
from the pending request buffer
+        commitFutures.forEach(f -> f.complete(null));
+        fetchFutures.forEach(f -> f.complete(null));
+        assertEquals(0, 
commitManager.pendingRequests.inflightOffsetFetches.size());
+    }
+
+    @Test
+    public void testPoll_EnsureEmptyPendingRequestAfterPoll() {
         CommitRequestManager commitRequestManger = create(true, 100);
-        time.sleep(100);
-        commitRequestManger.clientPoll(time.milliseconds());
-        NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
-        time.sleep(100);
-        // We want to make sure we don't resend autocommit if the previous 
request has not been completed
-        assertEquals(Long.MAX_VALUE, 
commitRequestManger.poll(time.milliseconds()).timeUntilNextPollMs);
+        commitRequestManger.addOffsetCommitRequest(new HashMap<>());
+        assertEquals(1, 
commitRequestManger.unsentOffsetCommitRequests().size());
+        commitRequestManger.poll(time.milliseconds());
+        assertTrue(commitRequestManger.unsentOffsetCommitRequests().isEmpty());
+        assertEmptyPendingRequests(commitRequestManger);
+    }
 
+    @Test
+    public void testAutocommit_ResendAutocommitAfterException() {
+        CommitRequestManager commitRequestManger = create(true, 100);
+        time.sleep(100);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        List<CompletableFuture<ClientResponse>> futures = assertPoll(1, 
commitRequestManger);
+        time.sleep(99);
         // complete the autocommit request (exceptionally)
-        res.unsentRequests.get(0).future().completeExceptionally(new 
KafkaException("test exception"));
+        futures.get(0).completeExceptionally(new KafkaException("test 
exception"));
 
         // we can then autocommit again
-        commitRequestManger.clientPoll(time.milliseconds());
-        res = commitRequestManger.poll(time.milliseconds());
-        assertEquals(1, res.unsentRequests.size());
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        assertPoll(0, commitRequestManger);
+        time.sleep(1);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        assertPoll(1, commitRequestManger);
+        assertEmptyPendingRequests(commitRequestManger);
     }
 
     @Test
-    public void testEnsureStagedCommitsPurgedAfterPoll() {
+    public void testAutocommit_EnsureOnlyOneInflightRequest() {
         CommitRequestManager commitRequestManger = create(true, 100);
-        commitRequestManger.add(new HashMap<>());
-        assertEquals(1, commitRequestManger.stagedCommits().size());
-        NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
-        assertTrue(commitRequestManger.stagedCommits().isEmpty());
+        time.sleep(100);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        List<CompletableFuture<ClientResponse>> futures = assertPoll(1, 
commitRequestManger);
+        time.sleep(100);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        // We want to make sure we don't resend autocommit if the previous 
request has not been completed
+        assertPoll(0, commitRequestManger);
+        assertEmptyPendingRequests(commitRequestManger);
+
+        // complete the unsent request and re-poll
+        futures.get(0).complete(null);
+        assertPoll(1, commitRequestManger);
     }
 
     @Test
-    public void testAutoCommitFuture() {
+    public void testOffsetFetchRequest_EnsureDuplicatedRequestSucceed() {

Review Comment:
   I renamed the test names to elucidate the intents of different tests



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##########
@@ -87,10 +102,58 @@ public void testSuccessfulStartupShutdown() {
     }
 
     @Test
-    public void testCommitAsync() {
+    public void testInvalidGroupId() {
+        this.groupId = null;
         consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
-        consumer.commitAsync();
-        verify(eventHandler).add(any());
+        assertThrows(InvalidGroupIdException.class, () -> 
consumer.committed(new HashSet<>()));
+    }
+
+    @Test
+    public void testCommitAsync_NullCallback() throws InterruptedException {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(new TopicPartition("my-topic", 0), new 
OffsetAndMetadata(100L));
+        offsets.put(new TopicPartition("my-topic", 1), new 
OffsetAndMetadata(200L));
+
+        PrototypeAsyncConsumer<?, ?> mockedConsumer = spy(newConsumer(time, 
new StringDeserializer(), new StringDeserializer()));
+        doReturn(future).when(mockedConsumer).commit(offsets);
+        mockedConsumer.commitAsync(offsets, null);
+        future.complete(null);
+        TestUtils.waitForCondition(() -> future.isDone(),
+                2000,
+                "commit future should complete");
+
+        assertFalse(future.isCompletedExceptionally());
+    }
+
+
+    @Test
+    public void testCommitAsync_UserSuppliedCallback() {

Review Comment:
   @guozhangwang  - This test and the above are to address your comment about 
null DefaultCommitCallback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to