junrao commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1323461724
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ########## @@ -455,6 +468,192 @@ public void testRequestFails_AuthenticationException() { assertEquals(0, requestManager.requestsToSend()); } + @Test + public void testResetPositionsSendNoRequestIfNoPartitionsNeedingReset() { + when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.emptySet()); + requestManager.resetPositionsIfNeeded(); + assertEquals(0, requestManager.requestsToSend()); + } + + @Test + public void testResetPositionsMissingLeader() { + expectFailedRequest_MissingLeader(); + when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); + when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST); + requestManager.resetPositionsIfNeeded(); + verify(metadata).requestUpdate(true); + assertEquals(0, requestManager.requestsToSend()); + } + + @Test + public void testResetPositionsSuccess_NoLeaderEpochInResponse() { + testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch.noLeaderOrEpoch()); + verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt()); + } + + @Test + public void testResetPositionsSuccess_LeaderEpochInResponse() { + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), + Optional.of(5)); + testResetPositionsSuccessWithLeaderEpoch(leaderAndEpoch); + verify(metadata).updateLastSeenEpochIfNewer(TEST_PARTITION_1, leaderAndEpoch.epoch.get()); + } + + @Test + public void testValidatePositionsSuccess() { + int currentOffset = 5; + int expectedEndOffset = 100; + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), + Optional.of(3)); + TopicPartition tp = TEST_PARTITION_1; + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(currentOffset, + Optional.of(10), leaderAndEpoch); + + expectSuccessfulBuildRequestForValidatingPositions(position, LEADER_1); + + requestManager.validatePositionsIfNeeded(); + assertEquals(1, requestManager.requestsToSend(), "Invalid request count"); + + verify(subscriptionState).setNextAllowedRetry(any(), anyLong()); + + // Validate positions response with end offsets + when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(LEADER_1, leaderAndEpoch.epoch)); + NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds()); + NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0); + ClientResponse clientResponse = buildOffsetsForLeaderEpochResponse(unsentRequest, + Collections.singletonList(tp), expectedEndOffset); + clientResponse.onComplete(); + assertTrue(unsentRequest.future().isDone()); + assertFalse(unsentRequest.future().isCompletedExceptionally()); + verify(subscriptionState).maybeCompleteValidation(any(), any(), any()); + } + + @Test + public void testValidatePositionsMissingLeader() { + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(Node.noNode()), + Optional.of(5)); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(5L, + Optional.of(10), leaderAndEpoch); + when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); + when(subscriptionState.position(any())).thenReturn(position, position); + NodeApiVersions nodeApiVersions = NodeApiVersions.create(); + when(apiVersions.get(LEADER_1.idString())).thenReturn(nodeApiVersions); + requestManager.validatePositionsIfNeeded(); + verify(metadata).requestUpdate(true); + assertEquals(0, requestManager.requestsToSend()); + } + + @Test + public void testValidatePositionsFailureWithUnrecoverableAuthException() { + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), + Optional.of(5)); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(5L, + Optional.of(10), leaderAndEpoch); + expectSuccessfulBuildRequestForValidatingPositions(position, LEADER_1); + + requestManager.validatePositionsIfNeeded(); + + // Validate positions response with TopicAuthorizationException + NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds()); + NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0); + ClientResponse clientResponse = + buildOffsetsForLeaderEpochResponseWithErrors(unsentRequest, Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED)); + clientResponse.onComplete(); + + assertTrue(unsentRequest.future().isDone()); + assertFalse(unsentRequest.future().isCompletedExceptionally()); + + // Following validatePositions should raise the previous exception without performing any + // request + assertThrows(TopicAuthorizationException.class, () -> requestManager.validatePositionsIfNeeded()); + assertEquals(0, requestManager.requestsToSend()); + } + + @Test + public void testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers() { + int currentOffset = 5; + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1), + Optional.of(3)); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(currentOffset, + Optional.of(10), leaderAndEpoch); + + when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); + when(subscriptionState.position(any())).thenReturn(position, position); + + // No api version info initially available + when(apiVersions.get(LEADER_1.idString())).thenReturn(null); + requestManager.validatePositionsIfNeeded(); + assertEquals(0, requestManager.requestsToSend(), "Invalid request count"); + verify(subscriptionState, never()).completeValidation(TEST_PARTITION_1); + verify(subscriptionState, never()).setNextAllowedRetry(any(), anyLong()); + + // Api version updated, next validate positions should successfully build the request + when(apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create()); + when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); + when(subscriptionState.position(any())).thenReturn(position, position); + requestManager.validatePositionsIfNeeded(); + assertEquals(1, requestManager.requestsToSend(), "Invalid request count"); + } + + private void expectSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition position, Node leader) { + when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); + when(subscriptionState.position(any())).thenReturn(position, position); + NodeApiVersions nodeApiVersions = NodeApiVersions.create(); + when(apiVersions.get(leader.idString())).thenReturn(nodeApiVersions); + } + + private void testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch leaderAndEpoch) { + TopicPartition tp = TEST_PARTITION_1; + Node leader = LEADER_1; + OffsetResetStrategy strategy = OffsetResetStrategy.EARLIEST; + long offset = 5L; + Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = Collections.singletonMap(tp, + new OffsetAndTimestamp(offset, 1L, leaderAndEpoch.epoch)); + when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(tp)); + when(subscriptionState.resetStrategy(any())).thenReturn(strategy); + expectSuccessfulRequest(Collections.singletonMap(tp, leader)); + + requestManager.resetPositionsIfNeeded(); + assertEquals(1, requestManager.requestsToSend()); + + // Reset positions response with offsets + when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(leader, leaderAndEpoch.epoch)); + NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds()); + NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0); + ClientResponse clientResponse = buildClientResponse(unsentRequest, expectedOffsets); + clientResponse.onComplete(); + assertTrue(unsentRequest.future().isDone()); + assertFalse(unsentRequest.future().isCompletedExceptionally()); + } + + @Test + public void testResetPositionsThrowsPreviousException() { Review Comment: Sorry, I left the comment on the wrong line. I was asking that `testValidatePositionsFailureWithUnrecoverableAuthException` and `testResetPositionsThrowsPreviousException` look very similar. Are they redundant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org