junrao commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1320355597
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -154,6 +170,52 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsets( OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets)); } + /** + * Reset offsets for all assigned partitions that require it. Offsets will be reset Review Comment: If the reset policy is not set and the position needs to be reset, we need to throw an exception to the caller, right. Do we have the logic to do that? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ########## @@ -455,6 +463,89 @@ public void testRequestFails_AuthenticationException() { assertEquals(0, requestManager.requestsToSend()); } + @Test + public void testResetPositionsSendNoRequestIfNoPartitionsNeedingReset() { + when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.emptySet()); + requestManager.resetPositionsIfNeeded(); + assertEquals(0, requestManager.requestsToSend()); + } + + @Test + public void testResetPositionsMissingLeader() { + expectFailedRequest_MissingLeader(); + when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); + when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST); + requestManager.resetPositionsIfNeeded(); + verify(metadata).requestUpdate(false); + assertEquals(0, requestManager.requestsToSend()); + } + + @Test + public void testResetPositionsSuccess_NoLeaderEpochInResponse() { + testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch.noLeaderOrEpoch()); + verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt()); + } + + @Test + public void testResetPositionsSuccess_LeaderEpochInResponse() { Review Comment: Should we add some similar tests for validatePositions? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java: ########## @@ -181,6 +187,89 @@ public void testListOffsetsEventIsProcessed() { assertTrue(applicationEventsQueue.isEmpty()); backgroundThread.close(); } + + @Test + public void testResetPositionsEventIsProcessed() { + when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + this.applicationEventsQueue = new LinkedBlockingQueue<>(); + this.backgroundEventsQueue = new LinkedBlockingQueue<>(); + DefaultBackgroundThread backgroundThread = mockBackgroundThread(); + ResetPositionsApplicationEvent e = new ResetPositionsApplicationEvent(); + this.applicationEventsQueue.add(e); + backgroundThread.runOnce(); + verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); + assertTrue(applicationEventsQueue.isEmpty()); + backgroundThread.close(); + } + + @Test + public void testResetPositionsProcessFailure() { + when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + this.applicationEventsQueue = new LinkedBlockingQueue<>(); + this.backgroundEventsQueue = new LinkedBlockingQueue<>(); + applicationEventProcessor = spy(new ApplicationEventProcessor( + this.backgroundEventsQueue, + mockRequestManagers(), + metadata)); + DefaultBackgroundThread backgroundThread = mockBackgroundThread(); + + TopicAuthorizationException authException = new TopicAuthorizationException("Topic authorization failed"); + doThrow(authException).when(offsetsRequestManager).resetPositionsIfNeeded(); Review Comment: Typically, TopicAuthorizationException won't be thrown when `resetPositionsIfNeeded` is called, right? TopicAuthorizationException is thrown through the future when a response is received. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org