lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1323744388


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##########
@@ -455,6 +468,192 @@ public void testRequestFails_AuthenticationException() {
         assertEquals(0, requestManager.requestsToSend());
     }
 
+    @Test
+    public void testResetPositionsSendNoRequestIfNoPartitionsNeedingReset() {
+        
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.emptySet());
+        requestManager.resetPositionsIfNeeded();
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void testResetPositionsMissingLeader() {
+        expectFailedRequest_MissingLeader();
+        
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        
when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST);
+        requestManager.resetPositionsIfNeeded();
+        verify(metadata).requestUpdate(true);
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void testResetPositionsSuccess_NoLeaderEpochInResponse() {
+        
testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch.noLeaderOrEpoch());
+        verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
+    }
+
+    @Test
+    public void testResetPositionsSuccess_LeaderEpochInResponse() {
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(5));
+        testResetPositionsSuccessWithLeaderEpoch(leaderAndEpoch);
+        verify(metadata).updateLastSeenEpochIfNewer(TEST_PARTITION_1, 
leaderAndEpoch.epoch.get());
+    }
+
+    @Test
+    public void testValidatePositionsSuccess() {
+        int currentOffset = 5;
+        int expectedEndOffset = 100;
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(3));
+        TopicPartition tp = TEST_PARTITION_1;
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(currentOffset,
+                Optional.of(10), leaderAndEpoch);
+
+        expectSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
+
+        requestManager.validatePositionsIfNeeded();
+        assertEquals(1, requestManager.requestsToSend(), "Invalid request 
count");
+
+        verify(subscriptionState).setNextAllowedRetry(any(), anyLong());
+
+        // Validate positions response with end offsets
+        when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(LEADER_1, 
leaderAndEpoch.epoch));
+        NetworkClientDelegate.PollResult pollResult = 
requestManager.poll(time.milliseconds());
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
pollResult.unsentRequests.get(0);
+        ClientResponse clientResponse = 
buildOffsetsForLeaderEpochResponse(unsentRequest,
+                Collections.singletonList(tp), expectedEndOffset);
+        clientResponse.onComplete();
+        assertTrue(unsentRequest.future().isDone());
+        assertFalse(unsentRequest.future().isCompletedExceptionally());
+        verify(subscriptionState).maybeCompleteValidation(any(), any(), any());
+    }
+
+    @Test
+    public void testValidatePositionsMissingLeader() {
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(Node.noNode()),
+                Optional.of(5));
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(5L,
+                Optional.of(10), leaderAndEpoch);
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+        NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+        when(apiVersions.get(LEADER_1.idString())).thenReturn(nodeApiVersions);
+        requestManager.validatePositionsIfNeeded();
+        verify(metadata).requestUpdate(true);
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void testValidatePositionsFailureWithUnrecoverableAuthException() {
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(5));
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(5L,
+                Optional.of(10), leaderAndEpoch);
+        expectSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
+
+        requestManager.validatePositionsIfNeeded();
+
+        // Validate positions response with TopicAuthorizationException
+        NetworkClientDelegate.PollResult res = 
requestManager.poll(time.milliseconds());
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
res.unsentRequests.get(0);
+        ClientResponse clientResponse =
+                buildOffsetsForLeaderEpochResponseWithErrors(unsentRequest, 
Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED));
+        clientResponse.onComplete();
+
+        assertTrue(unsentRequest.future().isDone());
+        assertFalse(unsentRequest.future().isCompletedExceptionally());
+
+        // Following validatePositions should raise the previous exception 
without performing any
+        // request
+        assertThrows(TopicAuthorizationException.class, () -> 
requestManager.validatePositionsIfNeeded());
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void 
testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers() {
+        int currentOffset = 5;
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(3));
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(currentOffset,
+                Optional.of(10), leaderAndEpoch);
+
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+
+        // No api version info initially available
+        when(apiVersions.get(LEADER_1.idString())).thenReturn(null);
+        requestManager.validatePositionsIfNeeded();
+        assertEquals(0, requestManager.requestsToSend(), "Invalid request 
count");
+        verify(subscriptionState, 
never()).completeValidation(TEST_PARTITION_1);
+        verify(subscriptionState, never()).setNextAllowedRetry(any(), 
anyLong());
+
+        // Api version updated, next validate positions should successfully 
build the request
+        
when(apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create());
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+        requestManager.validatePositionsIfNeeded();
+        assertEquals(1, requestManager.requestsToSend(), "Invalid request 
count");
+    }
+
+    private void 
expectSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition
 position, Node leader) {
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+        NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+        when(apiVersions.get(leader.idString())).thenReturn(nodeApiVersions);
+    }
+
+    private void 
testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch 
leaderAndEpoch) {
+        TopicPartition tp = TEST_PARTITION_1;
+        Node leader = LEADER_1;
+        OffsetResetStrategy strategy = OffsetResetStrategy.EARLIEST;
+        long offset = 5L;
+        Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = 
Collections.singletonMap(tp,
+                new OffsetAndTimestamp(offset, 1L, leaderAndEpoch.epoch));
+        
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(tp));
+        when(subscriptionState.resetStrategy(any())).thenReturn(strategy);
+        expectSuccessfulRequest(Collections.singletonMap(tp, leader));
+
+        requestManager.resetPositionsIfNeeded();
+        assertEquals(1, requestManager.requestsToSend());
+
+        // Reset positions response with offsets
+        when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(leader, 
leaderAndEpoch.epoch));
+        NetworkClientDelegate.PollResult pollResult = 
requestManager.poll(time.milliseconds());
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
pollResult.unsentRequests.get(0);
+        ClientResponse clientResponse = buildClientResponse(unsentRequest, 
expectedOffsets);
+        clientResponse.onComplete();
+        assertTrue(unsentRequest.future().isDone());
+        assertFalse(unsentRequest.future().isCompletedExceptionally());
+    }
+
+    @Test
+    public void testResetPositionsThrowsPreviousException() {

Review Comment:
   Sure, moved it next to the reset related ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to