This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71fdab1c5d5 MINOR: describeTopics should pass the timeout to the
describeCluster call (#20375)
71fdab1c5d5 is described below
commit 71fdab1c5d5048175ee29b7b2595344ec2eebb9b
Author: Chang-Chi Hsu <[email protected]>
AuthorDate: Tue Aug 26 13:38:53 2025 +0200
MINOR: describeTopics should pass the timeout to the describeCluster call
(#20375)
This PR ensures that describeTopics correctly propagates its timeoutMs
setting to the underlying describeCluster call. Integration tests were
added to verify that the API now fails with a TimeoutException when
brokers do not respond within the configured timeout.
Reviewers: Ken Huang <[email protected]>, TengYao Chi
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 2 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 26 +++++++++++++++++++++-
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ac2d67d2022..270a7124826 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2334,7 +2334,7 @@ public class KafkaAdminClient extends AdminClient {
}
// First, we need to retrieve the node info.
- DescribeClusterResult clusterResult = describeCluster();
+ DescribeClusterResult clusterResult = describeCluster(new
DescribeClusterOptions().timeoutMs(options.timeoutMs()));
clusterResult.nodes().whenComplete(
(nodes, exception) -> {
if (exception != null) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 1098078b582..3e093c5029a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -551,7 +551,8 @@ public class KafkaAdminClientTest {
* Test if admin client can be closed in the callback invoked when
* an api call completes. If calling {@link Admin#close()} in callback,
AdminClient thread hangs
*/
- @Test @Timeout(10)
+ @Test
+ @Timeout(10)
public void testCloseAdminClientInCallback() throws InterruptedException {
MockTime time = new MockTime();
AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
mockCluster(3, 0));
@@ -11668,4 +11669,27 @@ public class KafkaAdminClientTest {
.setAssignmentEpoch(1));
return data;
}
+
+ @Test
+ @Timeout(30)
+ public void testDescribeTopicsTimeoutWhenNoBrokerResponds() throws
Exception {
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(
+ mockCluster(1, 0),
+ AdminClientConfig.RETRIES_CONFIG, "0",
+ AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ // Not using prepareResponse is equivalent to "no brokers respond".
+ long start = System.currentTimeMillis();
+ DescribeTopicsResult result =
env.adminClient().describeTopics(List.of("test-topic"), new
DescribeTopicsOptions().timeoutMs(200));
+ Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap =
result.topicNameValues();
+ KafkaFuture<TopicDescription> topicDescription =
topicDescriptionMap.get("test-topic");
+ ExecutionException exception =
assertThrows(ExecutionException.class, topicDescription::get);
+ // Duration should be greater than or equal to 200 ms but less
than 30000 ms.
+ long duration = System.currentTimeMillis() - start;
+
+ assertInstanceOf(TimeoutException.class, exception.getCause());
+ assertTrue(duration >= 150L && duration < 30000);
+ }
+ }
}