This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 65f789f5608 KAFKA-19626: KIP-1147 Consistency of command-line
arguments for remaining CLI tools (#20431)
65f789f5608 is described below
commit 65f789f5608a010e1dcd745f51f773d45af6d4b3
Author: Jhen-Yung Hsu <[email protected]>
AuthorDate: Fri Aug 29 19:04:03 2025 +0800
KAFKA-19626: KIP-1147 Consistency of command-line arguments for remaining
CLI tools (#20431)
This implements [KIP-1147](https://cwiki.apache.org/confluence/x/DguWF)
for kafka-cluster.sh, kafka-leader-election.sh and
kafka-streams-application-reset.sh.
Jira: https://issues.apache.org/jira/browse/KAFKA-19626
Reviewers: Andrew Schofield <[email protected]>
---
docs/streams/developer-guide/app-reset-tool.html | 9 ++-
.../java/org/apache/kafka/tools/ClusterTool.java | 21 ++++--
.../apache/kafka/tools/LeaderElectionCommand.java | 27 +++++++-
.../org/apache/kafka/tools/StreamsResetter.java | 29 ++++++--
.../org/apache/kafka/tools/ClusterToolTest.java | 67 ++++++++++++++++++
.../kafka/tools/LeaderElectionCommandTest.java | 57 ++++++++++++++-
.../apache/kafka/tools/ResetIntegrationTest.java | 81 ++++++++++++++++++++++
7 files changed, 272 insertions(+), 19 deletions(-)
diff --git a/docs/streams/developer-guide/app-reset-tool.html
b/docs/streams/developer-guide/app-reset-tool.html
index 48a40043e70..7f299ac9941 100644
--- a/docs/streams/developer-guide/app-reset-tool.html
+++ b/docs/streams/developer-guide/app-reset-tool.html
@@ -80,9 +80,12 @@
PORT2.
--by-duration <String: urls> Reset offsets to offset by
duration from
current timestamp. Format:
'PnDTnHnMnS'
---config-file <String: file name> Property file containing configs
to be
- passed to admin clients and embedded
- consumer.
+--config-file <String: file name> (Deprecated) Property file
containing configs to be
+ passed to admin clients and embedded
consumer.
+ This option will be removed in a
future version.
+ Use --command-config instead.
+--command-config <String: file name> Config properties file to be passed
to admin clients
+ and embedded consumer.
--dry-run Display the actions that would be
performed without executing the reset
commands.
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
index 3048179a4bc..c9f4e1afa26 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
@@ -26,6 +26,7 @@ import org.apache.kafka.server.util.CommandLineUtils;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
@@ -82,9 +83,13 @@ public class ClusterTool {
connectionOptions.addArgument("--bootstrap-controller", "-C")
.action(store())
.help("A list of host/port pairs to use for establishing
the connection to the KRaft controllers.");
- subpparser.addArgument("--config", "-c")
+ subpparser.addArgument("--config")
.action(store())
- .help("A property file containing configurations for the
Admin client.");
+ .help("(DEPRECATED) A property file containing
configurations for the Admin client. " +
+ "This option will be removed in a future version.
Use --command-config instead.");
+ subpparser.addArgument("--command-config", "-c")
+ .action(store())
+ .help("Config properties file for the Admin client.");
}
unregisterParser.addArgument("--id", "-i")
.type(Integer.class)
@@ -97,8 +102,16 @@ public class ClusterTool {
Namespace namespace = parser.parseArgsOrFail(args);
String command = namespace.getString("command");
- String configPath = namespace.getString("config");
- Properties properties = (configPath == null) ? new Properties() :
Utils.loadProps(configPath);
+ String configFile = namespace.getString("config");
+ String commandConfigFile = namespace.getString("command_config");
+ if (configFile != null && commandConfigFile != null) {
+ throw new ArgumentParserException("--config and --command-config
cannot be specified together.", parser);
+ }
+ if (configFile != null) {
+ System.out.println("Option --config has been deprecated and will
be removed in a future version. Use --command-config instead.");
+ commandConfigFile = configFile;
+ }
+ Properties properties = (commandConfigFile != null) ?
Utils.loadProps(commandConfigFile) : new Properties();
CommandLineUtils.initializeBootstrapProperties(properties,
Optional.ofNullable(namespace.getString("bootstrap_server")),
diff --git
a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
index 2b0f9c4d7b9..4a9a8d270ea 100644
--- a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
@@ -101,10 +101,15 @@ public class LeaderElectionCommand {
*/
Optional<Set<TopicPartition>> topicPartitions =
jsonFileTopicPartitions.or(() -> singleTopicPartition);
- Properties props = new Properties();
+ String commandConfigFile;
if (commandOptions.hasAdminClientConfig()) {
-
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+ System.out.println("Option --admin.config has been deprecated and
will be removed in a future version. Use --command-config instead.");
+ commandConfigFile = commandOptions.getAdminClientConfig();
+ } else {
+ commandConfigFile = commandOptions.getCommandConfig();
}
+ Properties props = (commandConfigFile != null) ?
Utils.loadProps(commandConfigFile) : new Properties();
+
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
commandOptions.getBootstrapServer());
if
(!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG,
Integer.toString((int) timeoutMs.toMillis()));
@@ -242,7 +247,9 @@ public class LeaderElectionCommand {
static class LeaderElectionCommandOptions extends CommandDefaultOptions {
private final ArgumentAcceptingOptionSpec<String> bootstrapServer;
+ @Deprecated(since = "4.2", forRemoval = true)
private final ArgumentAcceptingOptionSpec<String> adminClientConfig;
+ private final ArgumentAcceptingOptionSpec<String> commandConfig;
private final ArgumentAcceptingOptionSpec<String> pathToJsonFile;
private final ArgumentAcceptingOptionSpec<String> topic;
private final ArgumentAcceptingOptionSpec<Integer> partition;
@@ -260,10 +267,18 @@ public class LeaderElectionCommand {
adminClientConfig = parser
.accepts(
"admin.config",
- "Configuration properties files to pass to the admin
client")
+ "(DEPRECATED) Configuration properties files to pass to
the admin client. " +
+ "This option will be removed in a future version. Use
--command-config instead.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
+ commandConfig = parser
+ .accepts(
+ "command-config",
+ "Config properties file to pass to the admin client.")
+ .withRequiredArg()
+ .describedAs("Config file")
+ .ofType(String.class);
pathToJsonFile = parser
.accepts(
"path-to-json-file",
@@ -322,6 +337,10 @@ public class LeaderElectionCommand {
return options.valueOf(adminClientConfig);
}
+ public String getCommandConfig() {
+ return options.valueOf(commandConfig);
+ }
+
public String getTopic() {
return options.valueOf(topic);
}
@@ -344,6 +363,8 @@ public class LeaderElectionCommand {
throw new AdminCommandFailedException("Missing required
option(s): " + String.join(", ", missingOptions));
}
+ CommandLineUtils.checkInvalidArgs(parser, options,
adminClientConfig, commandConfig);
+
// One and only one is required: --topic, --all-topic-partitions
or --path-to-json-file
List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = List.of(
topic,
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
index 7047b77c180..c6a6704ff78 100644
--- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -131,10 +131,15 @@ public class StreamsResetter {
StreamsResetterOptions options = new StreamsResetterOptions(args);
String groupId = options.applicationId();
- Properties properties = new Properties();
- if (options.hasCommandConfig()) {
- properties.putAll(Utils.loadProps(options.commandConfig()));
+
+ String commandConfigFile;
+ if (options.hasConfig()) {
+ System.out.println("Option --config-file has been deprecated
and will be removed in a future version. Use --command-config instead.");
+ commandConfigFile = options.config();
+ } else {
+ commandConfigFile = options.commandConfig();
}
+ Properties properties = (commandConfigFile != null) ?
Utils.loadProps(commandConfigFile) : new Properties();
String bootstrapServerValue = "localhost:9092";
if (options.hasBootstrapServer()) {
@@ -573,6 +578,8 @@ public class StreamsResetter {
private final OptionSpec<String> fromFileOption;
private final OptionSpec<Long> shiftByOption;
private final OptionSpecBuilder dryRunOption;
+ @Deprecated(since = "4.2", forRemoval = true)
+ private final OptionSpec<String> configOption;
private final OptionSpec<String> commandConfigOption;
private final OptionSpecBuilder forceOption;
@@ -624,7 +631,12 @@ public class StreamsResetter {
.withRequiredArg()
.describedAs("number-of-offsets")
.ofType(Long.class);
- commandConfigOption = parser.accepts("config-file", "Property file
containing configs to be passed to admin clients and embedded consumer.")
+ configOption = parser.accepts("config-file", "(DEPRECATED)
Property file containing configs to be passed to admin clients and embedded
consumer. "
+ + "This option will be removed in a future version. Use
--command-config instead.")
+ .withRequiredArg()
+ .ofType(String.class)
+ .describedAs("file name");
+ commandConfigOption = parser.accepts("command-config", "Config
properties file to be passed to admin clients and embedded consumer.")
.withRequiredArg()
.ofType(String.class)
.describedAs("file name");
@@ -648,6 +660,7 @@ public class StreamsResetter {
CommandLineUtils.checkInvalidArgs(parser, options,
toLatestOption, toOffsetOption, toDatetimeOption, byDurationOption,
toEarliestOption, fromFileOption, shiftByOption);
CommandLineUtils.checkInvalidArgs(parser, options,
fromFileOption, toOffsetOption, toDatetimeOption, byDurationOption,
toEarliestOption, toLatestOption, shiftByOption);
CommandLineUtils.checkInvalidArgs(parser, options,
shiftByOption, toOffsetOption, toDatetimeOption, byDurationOption,
toEarliestOption, toLatestOption, fromFileOption);
+ CommandLineUtils.checkInvalidArgs(parser, options,
configOption, commandConfigOption);
} catch (final OptionException e) {
CommandLineUtils.printUsageAndExit(parser, e.getMessage());
}
@@ -661,8 +674,12 @@ public class StreamsResetter {
return options.valueOf(applicationIdOption);
}
- public boolean hasCommandConfig() {
- return options.has(commandConfigOption);
+ public boolean hasConfig() {
+ return options.has(configOption);
+ }
+
+ public String config() {
+ return options.valueOf(configOption);
}
public String commandConfig() {
diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
index 201f8e56f4e..6207cc740fd 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
@@ -17,17 +17,27 @@
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.TestUtils;
+
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
+import java.util.Properties;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -120,6 +130,63 @@ public class ClusterToolTest {
assertEquals("The option --include-fenced-brokers is only supported
with --bootstrap-server option", exception.getMessage());
}
+ @ClusterTest
+ public void testDeprecatedConfig(ClusterInstance clusterInstance) throws
IOException {
+ File configFile = TestUtils.tempFile("client.id=my-client");
+
+ try (final MockedStatic<Admin> mockedAdmin =
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
+ String output = ToolsTestUtils.captureStandardOut(() ->
+ assertDoesNotThrow(() -> ClusterTool.execute(
+ "cluster-id",
+ "--bootstrap-server", clusterInstance.bootstrapServers(),
+ "--config", configFile.getAbsolutePath()
+ ))
+ );
+ assertTrue(output.contains("Option --config has been deprecated
and will be removed in a future version. Use --command-config instead."));
+
+ ArgumentCaptor<Properties> argumentCaptor =
ArgumentCaptor.forClass(Properties.class);
+ mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+ final Properties actualProps = argumentCaptor.getValue();
+ assertEquals("my-client",
actualProps.get(AdminClientConfig.CLIENT_ID_CONFIG));
+ }
+ }
+
+ @ClusterTest
+ public void testCommandConfig(ClusterInstance clusterInstance) throws
IOException {
+ File configFile = TestUtils.tempFile("client.id=my-client");
+
+ try (final MockedStatic<Admin> mockedAdmin =
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
+ String output = ToolsTestUtils.captureStandardOut(() ->
+ assertDoesNotThrow(() -> ClusterTool.execute(
+ "cluster-id",
+ "--bootstrap-server", clusterInstance.bootstrapServers(),
+ "--command-config", configFile.getAbsolutePath()
+ ))
+ );
+ assertTrue(output.contains("Cluster ID: " +
clusterInstance.clusterId()));
+
+ ArgumentCaptor<Properties> argumentCaptor =
ArgumentCaptor.forClass(Properties.class);
+ mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+ final Properties actualProps = argumentCaptor.getValue();
+ assertEquals("my-client",
actualProps.get(AdminClientConfig.CLIENT_ID_CONFIG));
+ }
+ }
+
+ @ClusterTest
+ public void testCommandConfigAndDeprecatedConfigPresent(ClusterInstance
clusterInstance) throws IOException {
+ File configFile = TestUtils.tempFile("client.id=my-client");
+
+ ArgumentParserException ex =
assertThrows(ArgumentParserException.class, () ->
+ ClusterTool.execute(
+ "cluster-id",
+ "--bootstrap-server", clusterInstance.bootstrapServers(),
+ "--config", configFile.getAbsolutePath(),
+ "--command-config", configFile.getAbsolutePath()
+ )
+ );
+ assertEquals("--config and --command-config cannot be specified
together.", ex.getMessage());
+ }
+
@Test
public void testPrintClusterId() throws Exception {
Admin adminClient = new MockAdminClient.Builder().
diff --git
a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index c3623f24ad0..2e3ebf6a7bd 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.utils.Exit;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
@@ -104,7 +105,32 @@ public class LeaderElectionCommandTest {
}
@ClusterTest
- public void testAdminConfigCustomTimeouts() throws Exception {
+ public void testDeprecatedAdminConfig() throws Exception {
+ String defaultApiTimeoutMs = String.valueOf(110000);
+ String requestTimeoutMs = String.valueOf(55000);
+ Path adminConfigPath = tempAdminConfig(defaultApiTimeoutMs,
requestTimeoutMs);
+
+ try (final MockedStatic<Admin> mockedAdmin =
Mockito.mockStatic(Admin.class)) {
+ String output = ToolsTestUtils.captureStandardOut(() -> {
+ LeaderElectionCommand.mainNoExit(
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--election-type", "unclean", "--all-topic-partitions",
+ "--admin.config", adminConfigPath.toString()
+ );
+ });
+ assertTrue(output.contains("Option --admin.config has been
deprecated and will be removed in a future version. Use --command-config
instead."));
+
+ ArgumentCaptor<Properties> argumentCaptor =
ArgumentCaptor.forClass(Properties.class);
+ mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+ // verify that properties provided to admin client are the
overridden properties
+ final Properties actualProps = argumentCaptor.getValue();
+
assertEquals(actualProps.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),
requestTimeoutMs);
+
assertEquals(actualProps.get(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG),
defaultApiTimeoutMs);
+ }
+ }
+
+ @ClusterTest
+ public void testCommandConfig() throws Exception {
String defaultApiTimeoutMs = String.valueOf(110000);
String requestTimeoutMs = String.valueOf(55000);
Path adminConfigPath = tempAdminConfig(defaultApiTimeoutMs,
requestTimeoutMs);
@@ -113,12 +139,11 @@ public class LeaderElectionCommandTest {
assertEquals(1, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean", "--all-topic-partitions",
- "--admin.config", adminConfigPath.toString()
+ "--command-config", adminConfigPath.toString()
));
ArgumentCaptor<Properties> argumentCaptor =
ArgumentCaptor.forClass(Properties.class);
mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
-
// verify that properties provided to admin client are the
overridden properties
final Properties actualProps = argumentCaptor.getValue();
assertEquals(actualProps.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),
requestTimeoutMs);
@@ -126,6 +151,32 @@ public class LeaderElectionCommandTest {
}
}
+ @ClusterTest
+ public void testCommandConfigAndDeprecatedConfigPresent() throws Exception
{
+ String defaultApiTimeoutMs = String.valueOf(110000);
+ String requestTimeoutMs = String.valueOf(55000);
+ Path adminConfigPath = tempAdminConfig(defaultApiTimeoutMs,
requestTimeoutMs);
+
+ try (final MockedStatic<Admin> mockedAdmin =
Mockito.mockStatic(Admin.class)) {
+ // Mock Exit because CommandLineUtils.checkInvalidArgs calls exit
+ Exit.setExitProcedure(new ToolsTestUtils.MockExitProcedure());
+
+ String output = ToolsTestUtils.captureStandardErr(() -> {
+ LeaderElectionCommand.mainNoExit(
+ "--bootstrap-server", "localhost:9092",
+ "--election-type", "unclean", "--all-topic-partitions",
+ "--admin.config", adminConfigPath.toString(),
+ "--command-config", adminConfigPath.toString()
+ );
+ });
+
+ assertTrue(output.contains(String.format("Option \"%s\" can't be
used with option \"%s\"",
+ "[admin.config]", "[command-config]")));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
@ClusterTest
public void testTopicPartition() throws InterruptedException,
ExecutionException {
String topic = "unclean-topic";
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
index b8b9f312e8c..9b881fc8ece 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
@@ -16,8 +16,11 @@
*/
package org.apache.kafka.tools;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -34,6 +37,9 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import java.io.BufferedWriter;
import java.io.File;
@@ -238,6 +244,81 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
assertEquals(1, exitCode);
}
+ @Test
+ public void testDeprecatedConfig(final TestInfo testInfo) throws
IOException {
+ File configFile = TestUtils.tempFile("client.id=my-client");
+
+ final String appID = safeUniqueTestName(testInfo);
+ final String[] parameters = new String[] {
+ "--application-id", appID,
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--internal-topics", INPUT_TOPIC,
+ "--config-file", configFile.getAbsolutePath()
+ };
+
+ try (final MockedStatic<Admin> mockedAdmin =
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
+ String output = ToolsTestUtils.captureStandardOut(() -> {
+ new StreamsResetter().execute(parameters);
+ });
+ assertTrue(output.contains("Option --config-file has been
deprecated and will be removed in a future version. Use --command-config
instead."));
+
+ ArgumentCaptor<Properties> argumentCaptor =
ArgumentCaptor.forClass(Properties.class);
+ mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+ final Properties actualProps = argumentCaptor.getValue();
+ assertEquals("my-client",
actualProps.get(AdminClientConfig.CLIENT_ID_CONFIG));
+ }
+ }
+
+ @Test
+ public void testCommandConfig(final TestInfo testInfo) throws IOException {
+ File configFile = TestUtils.tempFile("client.id=my-client");
+
+ final String appID = safeUniqueTestName(testInfo);
+ final String[] parameters = new String[] {
+ "--application-id", appID,
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--internal-topics", INPUT_TOPIC,
+ "--command-config", configFile.getAbsolutePath()
+ };
+
+ try (final MockedStatic<Admin> mockedAdmin =
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
+ new StreamsResetter().execute(parameters);
+
+ ArgumentCaptor<Properties> argumentCaptor =
ArgumentCaptor.forClass(Properties.class);
+ mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+ final Properties actualProps = argumentCaptor.getValue();
+ assertEquals("my-client",
actualProps.get(AdminClientConfig.CLIENT_ID_CONFIG));
+ }
+ }
+
+ @Test
+ public void testCommandConfigAndDeprecatedConfigPresent(final TestInfo
testInfo) throws IOException {
+ File configFile = TestUtils.tempFile("client.id=my-client");
+
+ final String appID = safeUniqueTestName(testInfo);
+ final String[] parameters = new String[] {
+ "--application-id", appID,
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--internal-topics", INPUT_TOPIC,
+ "--config-file", configFile.getAbsolutePath(),
+ "--command-config", configFile.getAbsolutePath()
+ };
+
+ try (final MockedStatic<Admin> mockedAdmin =
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
+ // Mock Exit because CommandLineUtils.checkInvalidArgs calls exit
+ Exit.setExitProcedure(new ToolsTestUtils.MockExitProcedure());
+
+ String output = ToolsTestUtils.captureStandardErr(() -> {
+ new StreamsResetter().execute(parameters);
+ });
+
+ assertTrue(output.contains(String.format("Option \"%s\" can't be
used with option \"%s\"",
+ "[config-file]", "[command-config]")));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
@Test
public void testResetWhenLongSessionTimeoutConfiguredWithForceOption(final
TestInfo testInfo) throws Exception {
final String appID = safeUniqueTestName(testInfo);