philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1150022881
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -61,73 +83,250 @@ public void setup() { } @Test - public void testPoll() { + public void testPoll_EnsureManualCommitSent() { CommitRequestManager commitRequestManger = create(false, 0); - NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); - assertEquals(0, res.unsentRequests.size()); - assertEquals(Long.MAX_VALUE, res.timeUntilNextPollMs); + assertPoll(0, commitRequestManger); Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0)); - commitRequestManger.add(offsets); - res = commitRequestManger.poll(time.milliseconds()); - assertEquals(1, res.unsentRequests.size()); + commitRequestManger.addOffsetCommitRequest(offsets); + assertPoll(1, commitRequestManger); } @Test - public void testPollAndAutoCommit() { + public void testPoll_EnsureAutocommitSent() { CommitRequestManager commitRequestManger = create(true, 100); - NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); - assertEquals(0, res.unsentRequests.size()); - assertEquals(Long.MAX_VALUE, res.timeUntilNextPollMs); + assertPoll(0, commitRequestManger); Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0)); - commitRequestManger.clientPoll(time.milliseconds()); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); when(subscriptionState.allConsumed()).thenReturn(offsets); time.sleep(100); - commitRequestManger.clientPoll(time.milliseconds()); - res = commitRequestManger.poll(time.milliseconds()); - assertEquals(1, res.unsentRequests.size()); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + assertPoll(1, commitRequestManger); } @Test - public void testAutocommitStateUponFailure() { + public void testPoll_EnsureCorrectInflightRequestBufferSize() { + CommitRequestManager commitManager = create(false, 100); + + // Create some offset commit requests + Map<TopicPartition, OffsetAndMetadata> offsets1 = new HashMap<>(); + offsets1.put(new TopicPartition("test", 0), new OffsetAndMetadata(10L)); + offsets1.put(new TopicPartition("test", 1), new OffsetAndMetadata(20L)); + Map<TopicPartition, OffsetAndMetadata> offsets2 = new HashMap<>(); + offsets2.put(new TopicPartition("test", 3), new OffsetAndMetadata(20L)); + offsets2.put(new TopicPartition("test", 4), new OffsetAndMetadata(20L)); + + // Add the requests to the CommitRequestManager and store their futures + ArrayList<CompletableFuture<ClientResponse>> commitFutures = new ArrayList<>(); + ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> fetchFutures = new ArrayList<>(); + commitFutures.add(commitManager.addOffsetCommitRequest(offsets1)); + fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 0)))); + commitFutures.add(commitManager.addOffsetCommitRequest(offsets2)); + fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 1)))); + + // Poll the CommitRequestManager and verify that the inflightOffsetFetches size is correct + NetworkClientDelegate.PollResult result = commitManager.poll(time.milliseconds()); + assertEquals(4, result.unsentRequests.size()); + assertTrue(result.unsentRequests + .stream().anyMatch(r -> r.requestBuilder() instanceof OffsetCommitRequest.Builder)); + assertTrue(result.unsentRequests + .stream().anyMatch(r -> r.requestBuilder() instanceof OffsetFetchRequest.Builder)); + assertFalse(commitManager.pendingRequests.hasUnsentRequests()); + assertEquals(2, commitManager.pendingRequests.inflightOffsetFetches.size()); + + // Verify that the inflight offset fetch requests have been removed from the pending request buffer + commitFutures.forEach(f -> f.complete(null)); + fetchFutures.forEach(f -> f.complete(null)); + assertEquals(0, commitManager.pendingRequests.inflightOffsetFetches.size()); + } + + @Test + public void testPoll_EnsureEmptyPendingRequestAfterPoll() { CommitRequestManager commitRequestManger = create(true, 100); - time.sleep(100); - commitRequestManger.clientPoll(time.milliseconds()); - NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); - time.sleep(100); - // We want to make sure we don't resend autocommit if the previous request has not been completed - assertEquals(Long.MAX_VALUE, commitRequestManger.poll(time.milliseconds()).timeUntilNextPollMs); + commitRequestManger.addOffsetCommitRequest(new HashMap<>()); + assertEquals(1, commitRequestManger.unsentOffsetCommitRequests().size()); + commitRequestManger.poll(time.milliseconds()); + assertTrue(commitRequestManger.unsentOffsetCommitRequests().isEmpty()); + assertEmptyPendingRequests(commitRequestManger); + } + @Test + public void testAutocommit_ResendAutocommitAfterException() { + CommitRequestManager commitRequestManger = create(true, 100); + time.sleep(100); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + List<CompletableFuture<ClientResponse>> futures = assertPoll(1, commitRequestManger); + time.sleep(99); // complete the autocommit request (exceptionally) - res.unsentRequests.get(0).future().completeExceptionally(new KafkaException("test exception")); + futures.get(0).completeExceptionally(new KafkaException("test exception")); // we can then autocommit again - commitRequestManger.clientPoll(time.milliseconds()); - res = commitRequestManger.poll(time.milliseconds()); - assertEquals(1, res.unsentRequests.size()); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + assertPoll(0, commitRequestManger); + time.sleep(1); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + assertPoll(1, commitRequestManger); + assertEmptyPendingRequests(commitRequestManger); } @Test - public void testEnsureStagedCommitsPurgedAfterPoll() { + public void testAutocommit_EnsureOnlyOneInflightRequest() { CommitRequestManager commitRequestManger = create(true, 100); - commitRequestManger.add(new HashMap<>()); - assertEquals(1, commitRequestManger.stagedCommits().size()); - NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); - assertTrue(commitRequestManger.stagedCommits().isEmpty()); + time.sleep(100); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + List<CompletableFuture<ClientResponse>> futures = assertPoll(1, commitRequestManger); + time.sleep(100); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); Review Comment: @guozhangwang - Fixed setting the inflight request in the `maybeAutoCommit` method. This test should address this issue/comment. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -61,73 +83,250 @@ public void setup() { } @Test - public void testPoll() { + public void testPoll_EnsureManualCommitSent() { CommitRequestManager commitRequestManger = create(false, 0); - NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); - assertEquals(0, res.unsentRequests.size()); - assertEquals(Long.MAX_VALUE, res.timeUntilNextPollMs); + assertPoll(0, commitRequestManger); Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0)); - commitRequestManger.add(offsets); - res = commitRequestManger.poll(time.milliseconds()); - assertEquals(1, res.unsentRequests.size()); + commitRequestManger.addOffsetCommitRequest(offsets); + assertPoll(1, commitRequestManger); } @Test - public void testPollAndAutoCommit() { + public void testPoll_EnsureAutocommitSent() { CommitRequestManager commitRequestManger = create(true, 100); - NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); - assertEquals(0, res.unsentRequests.size()); - assertEquals(Long.MAX_VALUE, res.timeUntilNextPollMs); + assertPoll(0, commitRequestManger); Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0)); - commitRequestManger.clientPoll(time.milliseconds()); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); when(subscriptionState.allConsumed()).thenReturn(offsets); time.sleep(100); - commitRequestManger.clientPoll(time.milliseconds()); - res = commitRequestManger.poll(time.milliseconds()); - assertEquals(1, res.unsentRequests.size()); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + assertPoll(1, commitRequestManger); } @Test - public void testAutocommitStateUponFailure() { + public void testPoll_EnsureCorrectInflightRequestBufferSize() { + CommitRequestManager commitManager = create(false, 100); + + // Create some offset commit requests + Map<TopicPartition, OffsetAndMetadata> offsets1 = new HashMap<>(); + offsets1.put(new TopicPartition("test", 0), new OffsetAndMetadata(10L)); + offsets1.put(new TopicPartition("test", 1), new OffsetAndMetadata(20L)); + Map<TopicPartition, OffsetAndMetadata> offsets2 = new HashMap<>(); + offsets2.put(new TopicPartition("test", 3), new OffsetAndMetadata(20L)); + offsets2.put(new TopicPartition("test", 4), new OffsetAndMetadata(20L)); + + // Add the requests to the CommitRequestManager and store their futures + ArrayList<CompletableFuture<ClientResponse>> commitFutures = new ArrayList<>(); + ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> fetchFutures = new ArrayList<>(); + commitFutures.add(commitManager.addOffsetCommitRequest(offsets1)); + fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 0)))); + commitFutures.add(commitManager.addOffsetCommitRequest(offsets2)); + fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 1)))); + + // Poll the CommitRequestManager and verify that the inflightOffsetFetches size is correct + NetworkClientDelegate.PollResult result = commitManager.poll(time.milliseconds()); + assertEquals(4, result.unsentRequests.size()); + assertTrue(result.unsentRequests + .stream().anyMatch(r -> r.requestBuilder() instanceof OffsetCommitRequest.Builder)); + assertTrue(result.unsentRequests + .stream().anyMatch(r -> r.requestBuilder() instanceof OffsetFetchRequest.Builder)); + assertFalse(commitManager.pendingRequests.hasUnsentRequests()); + assertEquals(2, commitManager.pendingRequests.inflightOffsetFetches.size()); + + // Verify that the inflight offset fetch requests have been removed from the pending request buffer + commitFutures.forEach(f -> f.complete(null)); + fetchFutures.forEach(f -> f.complete(null)); + assertEquals(0, commitManager.pendingRequests.inflightOffsetFetches.size()); + } + + @Test + public void testPoll_EnsureEmptyPendingRequestAfterPoll() { CommitRequestManager commitRequestManger = create(true, 100); - time.sleep(100); - commitRequestManger.clientPoll(time.milliseconds()); - NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); - time.sleep(100); - // We want to make sure we don't resend autocommit if the previous request has not been completed - assertEquals(Long.MAX_VALUE, commitRequestManger.poll(time.milliseconds()).timeUntilNextPollMs); + commitRequestManger.addOffsetCommitRequest(new HashMap<>()); + assertEquals(1, commitRequestManger.unsentOffsetCommitRequests().size()); + commitRequestManger.poll(time.milliseconds()); + assertTrue(commitRequestManger.unsentOffsetCommitRequests().isEmpty()); + assertEmptyPendingRequests(commitRequestManger); + } + @Test + public void testAutocommit_ResendAutocommitAfterException() { + CommitRequestManager commitRequestManger = create(true, 100); + time.sleep(100); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + List<CompletableFuture<ClientResponse>> futures = assertPoll(1, commitRequestManger); + time.sleep(99); // complete the autocommit request (exceptionally) - res.unsentRequests.get(0).future().completeExceptionally(new KafkaException("test exception")); + futures.get(0).completeExceptionally(new KafkaException("test exception")); // we can then autocommit again - commitRequestManger.clientPoll(time.milliseconds()); - res = commitRequestManger.poll(time.milliseconds()); - assertEquals(1, res.unsentRequests.size()); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + assertPoll(0, commitRequestManger); + time.sleep(1); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + assertPoll(1, commitRequestManger); + assertEmptyPendingRequests(commitRequestManger); } @Test - public void testEnsureStagedCommitsPurgedAfterPoll() { + public void testAutocommit_EnsureOnlyOneInflightRequest() { CommitRequestManager commitRequestManger = create(true, 100); - commitRequestManger.add(new HashMap<>()); - assertEquals(1, commitRequestManger.stagedCommits().size()); - NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); - assertTrue(commitRequestManger.stagedCommits().isEmpty()); + time.sleep(100); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + List<CompletableFuture<ClientResponse>> futures = assertPoll(1, commitRequestManger); + time.sleep(100); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + // We want to make sure we don't resend autocommit if the previous request has not been completed + assertPoll(0, commitRequestManger); + assertEmptyPendingRequests(commitRequestManger); + + // complete the unsent request and re-poll + futures.get(0).complete(null); + assertPoll(1, commitRequestManger); } @Test - public void testAutoCommitFuture() { + public void testOffsetFetchRequest_EnsureDuplicatedRequestSucceed() { Review Comment: I renamed the test names to elucidate the intents of different tests ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java: ########## @@ -87,10 +102,58 @@ public void testSuccessfulStartupShutdown() { } @Test - public void testCommitAsync() { + public void testInvalidGroupId() { + this.groupId = null; consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); - consumer.commitAsync(); - verify(eventHandler).add(any()); + assertThrows(InvalidGroupIdException.class, () -> consumer.committed(new HashSet<>())); + } + + @Test + public void testCommitAsync_NullCallback() throws InterruptedException { + CompletableFuture<Void> future = new CompletableFuture<>(); + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(new TopicPartition("my-topic", 0), new OffsetAndMetadata(100L)); + offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); + + PrototypeAsyncConsumer<?, ?> mockedConsumer = spy(newConsumer(time, new StringDeserializer(), new StringDeserializer())); + doReturn(future).when(mockedConsumer).commit(offsets); + mockedConsumer.commitAsync(offsets, null); + future.complete(null); + TestUtils.waitForCondition(() -> future.isDone(), + 2000, + "commit future should complete"); + + assertFalse(future.isCompletedExceptionally()); + } + + + @Test + public void testCommitAsync_UserSuppliedCallback() { Review Comment: @guozhangwang - This test and the above are to address your comment about null DefaultCommitCallback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org