This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ae4d308f680 KAFKA-16015: Fix custom timeouts overwritten by defaults
in LeaderElectionCommand (#15030)
ae4d308f680 is described below
commit ae4d308f6806fac398665723041ca62fca78abee
Author: Sergio Troiano <[email protected]>
AuthorDate: Fri Dec 29 10:50:26 2023 +0100
KAFKA-16015: Fix custom timeouts overwritten by defaults in
LeaderElectionCommand (#15030)
This commit fixes a bug in LeaderElectionCommand due to which custom
timeout configuration was not being respected.
Reviewers: Divij Vaidya <[email protected]>, Proven Provenzano
<[email protected]>
---
.../apache/kafka/tools/LeaderElectionCommand.java | 8 +++--
.../kafka/tools/DeleteRecordsCommandTest.java | 2 +-
.../kafka/tools/LeaderElectionCommandTest.java | 36 ++++++++++++++++++++++
3 files changed, 43 insertions(+), 3 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
index 8d34937c900..c704e418750 100644
--- a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
@@ -99,8 +99,12 @@ public class LeaderElectionCommand {
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
}
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
commandOptions.getBootstrapServer());
- props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG,
Long.toString(timeoutMs.toMillis()));
- props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,
Long.toString(timeoutMs.toMillis() / 2));
+ if
(!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
+ props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG,
Integer.toString((int) timeoutMs.toMillis()));
+ }
+ if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
+ props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,
Integer.toString((int) (timeoutMs.toMillis() / 2)));
+ }
try (Admin adminClient = Admin.create(props)) {
electLeaders(adminClient, electionType, topicPartitions);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
index 2c06fb66ba0..b540c8ffc17 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
@@ -179,4 +179,4 @@ class DeleteRecordsCommandUnitTest {
() ->
DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup(jsonData)
);
}
-}
\ No newline at end of file
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index 0c9fa753f75..f68b55ed3fe 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -24,6 +24,7 @@ import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
@@ -34,6 +35,10 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.collection.JavaConverters;
+import org.mockito.MockedStatic;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -44,8 +49,10 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ExecutionException;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -106,6 +113,29 @@ public class LeaderElectionCommandTest {
TestUtils.assertLeader(client, topicPartition, broker3);
}
+ @ClusterTest
+ public void testAdminConfigCustomTimeouts() throws Exception {
+ String defaultApiTimeoutMs = String.valueOf(110000);
+ String requestTimeoutMs = String.valueOf(55000);
+ Path adminConfigPath = tempAdminConfig(defaultApiTimeoutMs,
requestTimeoutMs);
+
+ try (final MockedStatic<Admin> mockedAdmin =
Mockito.mockStatic(Admin.class)) {
+ LeaderElectionCommand.main(
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--election-type", "unclean", "--all-topic-partitions",
+ "--admin.config", adminConfigPath.toString()
+ );
+
+ ArgumentCaptor<Properties> argumentCaptor =
ArgumentCaptor.forClass(Properties.class);
+ mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+
+ // verify that properties provided to admin client are the
overridden properties
+ final Properties actualProps = argumentCaptor.getValue();
+
assertEquals(actualProps.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),
requestTimeoutMs);
+
assertEquals(actualProps.get(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG),
defaultApiTimeoutMs);
+ }
+ }
+
@ClusterTest
public void testTopicPartition() throws InterruptedException,
ExecutionException {
String topic = "unclean-topic";
@@ -295,4 +325,10 @@ public class LeaderElectionCommandTest {
return file.toPath();
}
+ private static Path tempAdminConfig(String defaultApiTimeoutMs, String
requestTimeoutMs) throws Exception {
+ String content = "default.api.timeout.ms=" + defaultApiTimeoutMs +
"\nrequest.timeout.ms=" + requestTimeoutMs;
+ java.io.File file = TestUtils.tempFile("admin-config", ".properties");
+ Files.write(file.toPath(), content.getBytes(StandardCharsets.UTF_8));
+ return file.toPath();
+ }
}