philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630908222


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##########
@@ -169,14 +183,80 @@ public void testHasAnyPendingRequests() throws Exception {
         }
     }
 
+    @Test
+    public void testPropagateMetadataError() {
+        LinkedList<BackgroundEvent> backgroundEventQueue = new LinkedList<>();
+        Metadata metadata = new Metadata(100, 100, 50000,
+                new LogContext(), new ClusterResourceListeners());
+        NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(metadata,
+                new BackgroundEventHandler(backgroundEventQueue));
+
+        String exMsg = "Test Auth Exception";
+        metadata.fatalError(new AuthenticationException(exMsg));
+        assertEquals(0, backgroundEventQueue.size());
+
+        networkClientDelegate.poll(0, time.milliseconds());
+        assertEquals(1, backgroundEventQueue.size());
+
+        ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+        assertNotNull(event);
+        assertEquals(AuthenticationException.class, event.error().getClass());
+        assertEquals(exMsg, event.error().getMessage());
+    }
+
+    @Test
+    public void testPropagateInvalidTopicMetadataError() {
+        LinkedList<BackgroundEvent> backgroundEventQueue = new LinkedList<>();
+        Metadata metadata = new Metadata(100, 100, 50000,
+                new LogContext(), new ClusterResourceListeners());
+        NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(metadata,
+                new BackgroundEventHandler(backgroundEventQueue));
+
+        String invalidTopic = "invalid topic";
+        MetadataResponse invalidTopicResponse = 
RequestTestUtils.metadataUpdateWith("clusterId", 1,
+                Collections.singletonMap(invalidTopic, 
Errors.INVALID_TOPIC_EXCEPTION), Collections.emptyMap());
+        metadata.updateWithCurrentRequestVersion(invalidTopicResponse, false, 
time.milliseconds());
+        assertEquals(0, backgroundEventQueue.size());
+
+        networkClientDelegate.poll(0, time.milliseconds());
+        assertEquals(1, backgroundEventQueue.size());
+
+        ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+        assertNotNull(event);
+        assertEquals(InvalidTopicException.class, event.error().getClass());
+        assertEquals(String.format("Invalid topics: [%s]", invalidTopic),
+                event.error().getMessage());
+    }
+
     public NetworkClientDelegate newNetworkClientDelegate() {
         LogContext logContext = new LogContext();
         Properties properties = new Properties();
         properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         properties.put(GROUP_ID_CONFIG, GROUP_ID);
         properties.put(REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS);
-        return new NetworkClientDelegate(this.time, new 
ConsumerConfig(properties), logContext, this.client);
+        return new NetworkClientDelegate(this.time,
+                new ConsumerConfig(properties),
+                logContext,
+                this.client,
+                this.metadata,
+                this.backgroundEventHandler);
+    }
+
+    public NetworkClientDelegate newNetworkClientDelegate(Metadata metadata,

Review Comment:
   probably not needed. you can do
   ```
   this.metadata = mock(Metadata.class)
   this.backgroundEventHandler = ...
   
   ncd = newNetworkClientDelegate()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to