This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71fdab1c5d5 MINOR: describeTopics should pass the timeout to the 
describeCluster call (#20375)
71fdab1c5d5 is described below

commit 71fdab1c5d5048175ee29b7b2595344ec2eebb9b
Author: Chang-Chi Hsu <[email protected]>
AuthorDate: Tue Aug 26 13:38:53 2025 +0200

    MINOR: describeTopics should pass the timeout to the describeCluster call 
(#20375)
    
    This PR ensures that describeTopics correctly propagates its timeoutMs
    setting to the underlying describeCluster call. Integration tests were
    added to verify that the API now fails with a TimeoutException when
    brokers do not respond within the configured timeout.
    
    Reviewers: Ken Huang <[email protected]>, TengYao Chi
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 26 +++++++++++++++++++++-
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ac2d67d2022..270a7124826 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2334,7 +2334,7 @@ public class KafkaAdminClient extends AdminClient {
         }
 
         // First, we need to retrieve the node info.
-        DescribeClusterResult clusterResult = describeCluster();
+        DescribeClusterResult clusterResult = describeCluster(new 
DescribeClusterOptions().timeoutMs(options.timeoutMs()));
         clusterResult.nodes().whenComplete(
             (nodes, exception) -> {
                 if (exception != null) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 1098078b582..3e093c5029a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -551,7 +551,8 @@ public class KafkaAdminClientTest {
      * Test if admin client can be closed in the callback invoked when
      * an api call completes. If calling {@link Admin#close()} in callback, 
AdminClient thread hangs
      */
-    @Test @Timeout(10)
+    @Test
+    @Timeout(10)
     public void testCloseAdminClientInCallback() throws InterruptedException {
         MockTime time = new MockTime();
         AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
mockCluster(3, 0));
@@ -11668,4 +11669,27 @@ public class KafkaAdminClientTest {
             .setAssignmentEpoch(1));
         return data;
     }
+
+    @Test
+    @Timeout(30)
+    public void testDescribeTopicsTimeoutWhenNoBrokerResponds() throws 
Exception {
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(
+            mockCluster(1, 0),
+            AdminClientConfig.RETRIES_CONFIG, "0",
+            AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Not using prepareResponse is equivalent to "no brokers respond".
+            long start = System.currentTimeMillis();
+            DescribeTopicsResult result = 
env.adminClient().describeTopics(List.of("test-topic"), new 
DescribeTopicsOptions().timeoutMs(200));
+            Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap = 
result.topicNameValues();
+            KafkaFuture<TopicDescription> topicDescription = 
topicDescriptionMap.get("test-topic");
+            ExecutionException exception = 
assertThrows(ExecutionException.class, topicDescription::get);
+            // Duration should be greater than or equal to 200 ms but less 
than 30000 ms.
+            long duration = System.currentTimeMillis() - start;
+
+            assertInstanceOf(TimeoutException.class, exception.getCause());
+            assertTrue(duration >= 150L && duration < 30000);
+        }
+    }
 }

Reply via email to