lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1323467149
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##########
@@ -455,6 +468,192 @@ public void testRequestFails_AuthenticationException() {
assertEquals(0, requestManager.requestsToSend());
}
+ @Test
+ public void testResetPositionsSendNoRequestIfNoPartitionsNeedingReset() {
+
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.emptySet());
+ requestManager.resetPositionsIfNeeded();
+ assertEquals(0, requestManager.requestsToSend());
+ }
+
+ @Test
+ public void testResetPositionsMissingLeader() {
+ expectFailedRequest_MissingLeader();
+
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+
when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST);
+ requestManager.resetPositionsIfNeeded();
+ verify(metadata).requestUpdate(true);
+ assertEquals(0, requestManager.requestsToSend());
+ }
+
+ @Test
+ public void testResetPositionsSuccess_NoLeaderEpochInResponse() {
+
testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch.noLeaderOrEpoch());
+ verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
+ }
+
+ @Test
+ public void testResetPositionsSuccess_LeaderEpochInResponse() {
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+ Optional.of(5));
+ testResetPositionsSuccessWithLeaderEpoch(leaderAndEpoch);
+ verify(metadata).updateLastSeenEpochIfNewer(TEST_PARTITION_1,
leaderAndEpoch.epoch.get());
+ }
+
+ @Test
+ public void testValidatePositionsSuccess() {
+ int currentOffset = 5;
+ int expectedEndOffset = 100;
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+ Optional.of(3));
+ TopicPartition tp = TEST_PARTITION_1;
+ SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(currentOffset,
+ Optional.of(10), leaderAndEpoch);
+
+ expectSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
+
+ requestManager.validatePositionsIfNeeded();
+ assertEquals(1, requestManager.requestsToSend(), "Invalid request
count");
+
+ verify(subscriptionState).setNextAllowedRetry(any(), anyLong());
+
+ // Validate positions response with end offsets
+ when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(LEADER_1,
leaderAndEpoch.epoch));
+ NetworkClientDelegate.PollResult pollResult =
requestManager.poll(time.milliseconds());
+ NetworkClientDelegate.UnsentRequest unsentRequest =
pollResult.unsentRequests.get(0);
+ ClientResponse clientResponse =
buildOffsetsForLeaderEpochResponse(unsentRequest,
+ Collections.singletonList(tp), expectedEndOffset);
+ clientResponse.onComplete();
+ assertTrue(unsentRequest.future().isDone());
+ assertFalse(unsentRequest.future().isCompletedExceptionally());
+ verify(subscriptionState).maybeCompleteValidation(any(), any(), any());
+ }
+
+ @Test
+ public void testValidatePositionsMissingLeader() {
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.of(Node.noNode()),
+ Optional.of(5));
+ SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(5L,
+ Optional.of(10), leaderAndEpoch);
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+ when(subscriptionState.position(any())).thenReturn(position, position);
+ NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+ when(apiVersions.get(LEADER_1.idString())).thenReturn(nodeApiVersions);
+ requestManager.validatePositionsIfNeeded();
+ verify(metadata).requestUpdate(true);
+ assertEquals(0, requestManager.requestsToSend());
+ }
+
+ @Test
+ public void testValidatePositionsFailureWithUnrecoverableAuthException() {
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+ Optional.of(5));
+ SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(5L,
+ Optional.of(10), leaderAndEpoch);
+ expectSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
+
+ requestManager.validatePositionsIfNeeded();
+
+ // Validate positions response with TopicAuthorizationException
+ NetworkClientDelegate.PollResult res =
requestManager.poll(time.milliseconds());
+ NetworkClientDelegate.UnsentRequest unsentRequest =
res.unsentRequests.get(0);
+ ClientResponse clientResponse =
+ buildOffsetsForLeaderEpochResponseWithErrors(unsentRequest,
Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED));
+ clientResponse.onComplete();
+
+ assertTrue(unsentRequest.future().isDone());
+ assertFalse(unsentRequest.future().isCompletedExceptionally());
+
+ // Following validatePositions should raise the previous exception
without performing any
+ // request
+ assertThrows(TopicAuthorizationException.class, () ->
requestManager.validatePositionsIfNeeded());
+ assertEquals(0, requestManager.requestsToSend());
+ }
+
+ @Test
+ public void
testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers() {
+ int currentOffset = 5;
+ Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+ Optional.of(3));
+ SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(currentOffset,
+ Optional.of(10), leaderAndEpoch);
+
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+ when(subscriptionState.position(any())).thenReturn(position, position);
+
+ // No api version info initially available
+ when(apiVersions.get(LEADER_1.idString())).thenReturn(null);
+ requestManager.validatePositionsIfNeeded();
+ assertEquals(0, requestManager.requestsToSend(), "Invalid request
count");
+ verify(subscriptionState,
never()).completeValidation(TEST_PARTITION_1);
+ verify(subscriptionState, never()).setNextAllowedRetry(any(),
anyLong());
+
+ // Api version updated, next validate positions should successfully
build the request
+
when(apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create());
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+ when(subscriptionState.position(any())).thenReturn(position, position);
+ requestManager.validatePositionsIfNeeded();
+ assertEquals(1, requestManager.requestsToSend(), "Invalid request
count");
+ }
+
+ private void
expectSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition
position, Node leader) {
+
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+ when(subscriptionState.position(any())).thenReturn(position, position);
+ NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+ when(apiVersions.get(leader.idString())).thenReturn(nodeApiVersions);
+ }
+
+ private void
testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch
leaderAndEpoch) {
+ TopicPartition tp = TEST_PARTITION_1;
+ Node leader = LEADER_1;
+ OffsetResetStrategy strategy = OffsetResetStrategy.EARLIEST;
+ long offset = 5L;
+ Map<TopicPartition, OffsetAndTimestamp> expectedOffsets =
Collections.singletonMap(tp,
+ new OffsetAndTimestamp(offset, 1L, leaderAndEpoch.epoch));
+
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(tp));
+ when(subscriptionState.resetStrategy(any())).thenReturn(strategy);
+ expectSuccessfulRequest(Collections.singletonMap(tp, leader));
+
+ requestManager.resetPositionsIfNeeded();
+ assertEquals(1, requestManager.requestsToSend());
+
+ // Reset positions response with offsets
+ when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(leader,
leaderAndEpoch.epoch));
+ NetworkClientDelegate.PollResult pollResult =
requestManager.poll(time.milliseconds());
+ NetworkClientDelegate.UnsentRequest unsentRequest =
pollResult.unsentRequests.get(0);
+ ClientResponse clientResponse = buildClientResponse(unsentRequest,
expectedOffsets);
+ clientResponse.onComplete();
+ assertTrue(unsentRequest.future().isDone());
+ assertFalse(unsentRequest.future().isCompletedExceptionally());
+ }
+
+ @Test
+ public void testResetPositionsThrowsPreviousException() {
Review Comment:
uhm those are the same two mentioned initially...do you mean a different one
maybe?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]