lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1323351275
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ########## @@ -494,6 +499,109 @@ public void testResetPositionsSuccess_LeaderEpochInResponse() { verify(metadata).updateLastSeenEpochIfNewer(TEST_PARTITION_1, leaderAndEpoch.epoch.get()); } + @Test + public void testValidatePositionsSuccess() { + int currentOffset = 5; + int expectedEndOffset = 100; + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), + Optional.of(3)); + TopicPartition tp = TEST_PARTITION_1; + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(currentOffset, + Optional.of(10), leaderAndEpoch); + + expectSuccessfulBuildRequestForValidatingPositions(position, LEADER_1); + + requestManager.validatePositionsIfNeeded(); + assertEquals(1, requestManager.requestsToSend(), "Invalid request count"); + + verify(subscriptionState).setNextAllowedRetry(any(), anyLong()); + + // Validate positions response with end offsets + when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(LEADER_1, leaderAndEpoch.epoch)); + NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds()); + NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0); + ClientResponse clientResponse = buildOffsetsForLeaderEpochResponse(unsentRequest, + Collections.singletonList(tp), expectedEndOffset); + clientResponse.onComplete(); + assertTrue(unsentRequest.future().isDone()); + assertFalse(unsentRequest.future().isCompletedExceptionally()); + verify(subscriptionState).maybeCompleteValidation(any(), any(), any()); + } + + @Test + public void testValidatePositionsMissingLeader() { + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(Node.noNode()), + Optional.of(5)); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(5L, + Optional.of(10), leaderAndEpoch); + when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); + when(subscriptionState.position(any())).thenReturn(position, position); + NodeApiVersions nodeApiVersions = NodeApiVersions.create(); + when(apiVersions.get(LEADER_1.idString())).thenReturn(nodeApiVersions); + requestManager.validatePositionsIfNeeded(); + verify(metadata).requestUpdate(true); + assertEquals(0, requestManager.requestsToSend()); + } + + @Test + public void testValidatePositionsFailureWithUnrecoverableAuthException() { + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), + Optional.of(5)); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(5L, + Optional.of(10), leaderAndEpoch); + expectSuccessfulBuildRequestForValidatingPositions(position, LEADER_1); + + requestManager.validatePositionsIfNeeded(); + + // Validate positions response with TopicAuthorizationException + NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds()); + NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0); + ClientResponse clientResponse = + buildOffsetsForLeaderEpochResponseWithErrors(unsentRequest, Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED)); + clientResponse.onComplete(); + + assertTrue(unsentRequest.future().isDone()); + assertFalse(unsentRequest.future().isCompletedExceptionally()); + + // Following validatePositions should raise the previous exception without performing any + // request + assertThrows(TopicAuthorizationException.class, () -> requestManager.validatePositionsIfNeeded()); + assertEquals(0, requestManager.requestsToSend()); + } + + @Test + public void testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers() { + int currentOffset = 5; + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), + Optional.of(3)); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(currentOffset, + Optional.of(10), leaderAndEpoch); + + when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); + when(subscriptionState.position(any())).thenReturn(position, position); + + // No api version info initially available + when(apiVersions.get(LEADER_1.idString())).thenReturn(null); + requestManager.validatePositionsIfNeeded(); + assertEquals(0, requestManager.requestsToSend(), "Invalid request count"); + verify(subscriptionState, never()).completeValidation(TEST_PARTITION_1); + verify(subscriptionState, never()).setNextAllowedRetry(any(), anyLong()); + + // Api version updated, next validate positions should successfully build the request + when(apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create()); + when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); + when(subscriptionState.position(any())).thenReturn(position, position); + requestManager.validatePositionsIfNeeded(); + assertEquals(1, requestManager.requestsToSend(), "Invalid request count"); + } + + private void expectSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition position, Node leader) { + when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); + when(subscriptionState.position(any())).thenReturn(position, position); Review Comment: Yes, it's called twice on the `getPartitionsToValidate` [here](https://github.com/apache/kafka/blob/4be174f5c1eed700411f0806a2eb391cee3f1177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java#L186) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org