lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1322093959


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java:
##########
@@ -181,6 +187,89 @@ public void testListOffsetsEventIsProcessed() {
         assertTrue(applicationEventsQueue.isEmpty());
         backgroundThread.close();
     }
+
+    @Test
+    public void testResetPositionsEventIsProcessed() {
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+        ResetPositionsApplicationEvent e = new 
ResetPositionsApplicationEvent();
+        this.applicationEventsQueue.add(e);
+        backgroundThread.runOnce();
+        
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
+        assertTrue(applicationEventsQueue.isEmpty());
+        backgroundThread.close();
+    }
+
+    @Test
+    public void testResetPositionsProcessFailure() {
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+        applicationEventProcessor = spy(new ApplicationEventProcessor(
+                this.backgroundEventsQueue,
+                mockRequestManagers(),
+                metadata));
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+
+        TopicAuthorizationException authException = new 
TopicAuthorizationException("Topic authorization failed");
+        
doThrow(authException).when(offsetsRequestManager).resetPositionsIfNeeded();

Review Comment:
   On this matter we kept the same behaviour as the old consumer 
implementation, mainly regarding when to throw those exceptions that may happen 
in the async calls to reset/validate. 
   
   In both consumers, Reset/validate are continuously called as part of the 
poll loop, and aligned with that usage, the `TopicAuthorizationException` is 
thrown when the `resetPositionsIfNeeded` is called, because it throws the 
exception saved from the previous reset(if any), done 
[here](https://github.com/apache/kafka/blob/39cc19c9924cd5589dc5b98b75ec8d380c159205/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java#L212).
 Same for the LogTruncationErrors detected when validating positions. In the 
old consumer, they are not thrown when detected. They are saved in-memory and 
thrown in the next call to `getPartitionsToValidate`, before sending the 
request.
   
   So we are keeping the same behaviour and not changing the timing of these 
errors within the poll loop. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to