lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1322093959
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java: ########## @@ -181,6 +187,89 @@ public void testListOffsetsEventIsProcessed() { assertTrue(applicationEventsQueue.isEmpty()); backgroundThread.close(); } + + @Test + public void testResetPositionsEventIsProcessed() { + when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + this.applicationEventsQueue = new LinkedBlockingQueue<>(); + this.backgroundEventsQueue = new LinkedBlockingQueue<>(); + DefaultBackgroundThread backgroundThread = mockBackgroundThread(); + ResetPositionsApplicationEvent e = new ResetPositionsApplicationEvent(); + this.applicationEventsQueue.add(e); + backgroundThread.runOnce(); + verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); + assertTrue(applicationEventsQueue.isEmpty()); + backgroundThread.close(); + } + + @Test + public void testResetPositionsProcessFailure() { + when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + this.applicationEventsQueue = new LinkedBlockingQueue<>(); + this.backgroundEventsQueue = new LinkedBlockingQueue<>(); + applicationEventProcessor = spy(new ApplicationEventProcessor( + this.backgroundEventsQueue, + mockRequestManagers(), + metadata)); + DefaultBackgroundThread backgroundThread = mockBackgroundThread(); + + TopicAuthorizationException authException = new TopicAuthorizationException("Topic authorization failed"); + doThrow(authException).when(offsetsRequestManager).resetPositionsIfNeeded(); Review Comment: On this matter we kept the same behaviour as the old consumer implementation, mainly regarding when to throw those exceptions that may happen in the async calls to reset/validate. In both consumers, Reset/validate are continuously called as part of the poll loop, and aligned with that usage, the `TopicAuthorizationException` is thrown when the `resetPositionsIfNeeded` is called, because it throws the exception saved from the previous reset(if any), done [here](https://github.com/apache/kafka/blob/39cc19c9924cd5589dc5b98b75ec8d380c159205/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java#L212). Same for the LogTruncationErrors detected when validating positions. In the old consumer, they are not thrown when detected. They are saved in-memory and thrown in the next call to `getPartitionsToValidate`, before sending the request. So we are keeping the same behaviour and not changing the timing of these errors within the poll loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org