lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1323351275


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##########
@@ -494,6 +499,109 @@ public void 
testResetPositionsSuccess_LeaderEpochInResponse() {
         verify(metadata).updateLastSeenEpochIfNewer(TEST_PARTITION_1, 
leaderAndEpoch.epoch.get());
     }
 
+    @Test
+    public void testValidatePositionsSuccess() {
+        int currentOffset = 5;
+        int expectedEndOffset = 100;
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(3));
+        TopicPartition tp = TEST_PARTITION_1;
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(currentOffset,
+                Optional.of(10), leaderAndEpoch);
+
+        expectSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
+
+        requestManager.validatePositionsIfNeeded();
+        assertEquals(1, requestManager.requestsToSend(), "Invalid request 
count");
+
+        verify(subscriptionState).setNextAllowedRetry(any(), anyLong());
+
+        // Validate positions response with end offsets
+        when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(LEADER_1, 
leaderAndEpoch.epoch));
+        NetworkClientDelegate.PollResult pollResult = 
requestManager.poll(time.milliseconds());
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
pollResult.unsentRequests.get(0);
+        ClientResponse clientResponse = 
buildOffsetsForLeaderEpochResponse(unsentRequest,
+                Collections.singletonList(tp), expectedEndOffset);
+        clientResponse.onComplete();
+        assertTrue(unsentRequest.future().isDone());
+        assertFalse(unsentRequest.future().isCompletedExceptionally());
+        verify(subscriptionState).maybeCompleteValidation(any(), any(), any());
+    }
+
+    @Test
+    public void testValidatePositionsMissingLeader() {
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(Node.noNode()),
+                Optional.of(5));
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(5L,
+                Optional.of(10), leaderAndEpoch);
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+        NodeApiVersions nodeApiVersions = NodeApiVersions.create();
+        when(apiVersions.get(LEADER_1.idString())).thenReturn(nodeApiVersions);
+        requestManager.validatePositionsIfNeeded();
+        verify(metadata).requestUpdate(true);
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void testValidatePositionsFailureWithUnrecoverableAuthException() {
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(5));
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(5L,
+                Optional.of(10), leaderAndEpoch);
+        expectSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
+
+        requestManager.validatePositionsIfNeeded();
+
+        // Validate positions response with TopicAuthorizationException
+        NetworkClientDelegate.PollResult res = 
requestManager.poll(time.milliseconds());
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
res.unsentRequests.get(0);
+        ClientResponse clientResponse =
+                buildOffsetsForLeaderEpochResponseWithErrors(unsentRequest, 
Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED));
+        clientResponse.onComplete();
+
+        assertTrue(unsentRequest.future().isDone());
+        assertFalse(unsentRequest.future().isCompletedExceptionally());
+
+        // Following validatePositions should raise the previous exception 
without performing any
+        // request
+        assertThrows(TopicAuthorizationException.class, () -> 
requestManager.validatePositionsIfNeeded());
+        assertEquals(0, requestManager.requestsToSend());
+    }
+
+    @Test
+    public void 
testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers() {
+        int currentOffset = 5;
+        Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
+                Optional.of(3));
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(currentOffset,
+                Optional.of(10), leaderAndEpoch);
+
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+
+        // No api version info initially available
+        when(apiVersions.get(LEADER_1.idString())).thenReturn(null);
+        requestManager.validatePositionsIfNeeded();
+        assertEquals(0, requestManager.requestsToSend(), "Invalid request 
count");
+        verify(subscriptionState, 
never()).completeValidation(TEST_PARTITION_1);
+        verify(subscriptionState, never()).setNextAllowedRetry(any(), 
anyLong());
+
+        // Api version updated, next validate positions should successfully 
build the request
+        
when(apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create());
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);
+        requestManager.validatePositionsIfNeeded();
+        assertEquals(1, requestManager.requestsToSend(), "Invalid request 
count");
+    }
+
+    private void 
expectSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition
 position, Node leader) {
+        
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
+        when(subscriptionState.position(any())).thenReturn(position, position);

Review Comment:
   Yes, it's called twice on the `getPartitionsToValidate` 
[here](https://github.com/apache/kafka/blob/4be174f5c1eed700411f0806a2eb391cee3f1177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java#L186)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to