This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 65f789f5608 KAFKA-19626: KIP-1147 Consistency of command-line 
arguments for remaining CLI tools (#20431)
65f789f5608 is described below

commit 65f789f5608a010e1dcd745f51f773d45af6d4b3
Author: Jhen-Yung Hsu <[email protected]>
AuthorDate: Fri Aug 29 19:04:03 2025 +0800

    KAFKA-19626: KIP-1147 Consistency of command-line arguments for remaining 
CLI tools (#20431)
    
    This implements [KIP-1147](https://cwiki.apache.org/confluence/x/DguWF)
    for kafka-cluster.sh, kafka-leader-election.sh and
    kafka-streams-application-reset.sh.
    
    Jira: https://issues.apache.org/jira/browse/KAFKA-19626
    
    Reviewers: Andrew Schofield <[email protected]>
---
 docs/streams/developer-guide/app-reset-tool.html   |  9 ++-
 .../java/org/apache/kafka/tools/ClusterTool.java   | 21 ++++--
 .../apache/kafka/tools/LeaderElectionCommand.java  | 27 +++++++-
 .../org/apache/kafka/tools/StreamsResetter.java    | 29 ++++++--
 .../org/apache/kafka/tools/ClusterToolTest.java    | 67 ++++++++++++++++++
 .../kafka/tools/LeaderElectionCommandTest.java     | 57 ++++++++++++++-
 .../apache/kafka/tools/ResetIntegrationTest.java   | 81 ++++++++++++++++++++++
 7 files changed, 272 insertions(+), 19 deletions(-)

diff --git a/docs/streams/developer-guide/app-reset-tool.html 
b/docs/streams/developer-guide/app-reset-tool.html
index 48a40043e70..7f299ac9941 100644
--- a/docs/streams/developer-guide/app-reset-tool.html
+++ b/docs/streams/developer-guide/app-reset-tool.html
@@ -80,9 +80,12 @@
                                          PORT2.
 --by-duration &lt;String: urls&gt;          Reset offsets to offset by 
duration from
                                         current timestamp. Format: 
&#39;PnDTnHnMnS&#39;
---config-file &lt;String: file name&gt;     Property file containing configs 
to be
-                                        passed to admin clients and embedded
-                                        consumer.
+--config-file &lt;String: file name&gt;    (Deprecated) Property file 
containing configs to be
+                                        passed to admin clients and embedded 
consumer.
+                                        This option will be removed in a 
future version.
+                                        Use --command-config instead.
+--command-config &lt;String: file name&gt; Config properties file to be passed 
to admin clients
+                                        and embedded consumer.
 --dry-run                             Display the actions that would be
                                         performed without executing the reset
                                         commands.
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java 
b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
index 3048179a4bc..c9f4e1afa26 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
@@ -26,6 +26,7 @@ import org.apache.kafka.server.util.CommandLineUtils;
 
 import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
 import net.sourceforge.argparse4j.inf.Namespace;
 import net.sourceforge.argparse4j.inf.Subparser;
@@ -82,9 +83,13 @@ public class ClusterTool {
             connectionOptions.addArgument("--bootstrap-controller", "-C")
                     .action(store())
                     .help("A list of host/port pairs to use for establishing 
the connection to the KRaft controllers.");
-            subpparser.addArgument("--config", "-c")
+            subpparser.addArgument("--config")
                     .action(store())
-                    .help("A property file containing configurations for the 
Admin client.");
+                    .help("(DEPRECATED) A property file containing 
configurations for the Admin client. " +
+                            "This option will be removed in a future version. 
Use --command-config instead.");
+            subpparser.addArgument("--command-config", "-c")
+                    .action(store())
+                    .help("Config properties file for the Admin client.");
         }
         unregisterParser.addArgument("--id", "-i")
                 .type(Integer.class)
@@ -97,8 +102,16 @@ public class ClusterTool {
 
         Namespace namespace = parser.parseArgsOrFail(args);
         String command = namespace.getString("command");
-        String configPath = namespace.getString("config");
-        Properties properties = (configPath == null) ? new Properties() : 
Utils.loadProps(configPath);
+        String configFile = namespace.getString("config");
+        String commandConfigFile = namespace.getString("command_config");
+        if (configFile != null && commandConfigFile != null) {
+            throw new ArgumentParserException("--config and --command-config 
cannot be specified together.", parser);
+        }
+        if (configFile != null) {
+            System.out.println("Option --config has been deprecated and will 
be removed in a future version. Use --command-config instead.");
+            commandConfigFile = configFile;
+        }
+        Properties properties = (commandConfigFile != null) ? 
Utils.loadProps(commandConfigFile) : new Properties();
 
         CommandLineUtils.initializeBootstrapProperties(properties,
                 Optional.ofNullable(namespace.getString("bootstrap_server")),
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
index 2b0f9c4d7b9..4a9a8d270ea 100644
--- a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
@@ -101,10 +101,15 @@ public class LeaderElectionCommand {
          */
         Optional<Set<TopicPartition>> topicPartitions = 
jsonFileTopicPartitions.or(() -> singleTopicPartition);
 
-        Properties props = new Properties();
+        String commandConfigFile;
         if (commandOptions.hasAdminClientConfig()) {
-            
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+            System.out.println("Option --admin.config has been deprecated and 
will be removed in a future version. Use --command-config instead.");
+            commandConfigFile = commandOptions.getAdminClientConfig();
+        } else {
+            commandConfigFile = commandOptions.getCommandConfig();
         }
+        Properties props = (commandConfigFile != null) ? 
Utils.loadProps(commandConfigFile) : new Properties();
+
         props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOptions.getBootstrapServer());
         if 
(!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
             props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
Integer.toString((int) timeoutMs.toMillis()));
@@ -242,7 +247,9 @@ public class LeaderElectionCommand {
 
     static class LeaderElectionCommandOptions extends CommandDefaultOptions {
         private final ArgumentAcceptingOptionSpec<String> bootstrapServer;
+        @Deprecated(since = "4.2", forRemoval = true)
         private final ArgumentAcceptingOptionSpec<String> adminClientConfig;
+        private final ArgumentAcceptingOptionSpec<String> commandConfig;
         private final ArgumentAcceptingOptionSpec<String> pathToJsonFile;
         private final ArgumentAcceptingOptionSpec<String> topic;
         private final ArgumentAcceptingOptionSpec<Integer> partition;
@@ -260,10 +267,18 @@ public class LeaderElectionCommand {
             adminClientConfig = parser
                 .accepts(
                     "admin.config",
-                    "Configuration properties files to pass to the admin 
client")
+                    "(DEPRECATED) Configuration properties files to pass to 
the admin client. " +
+                    "This option will be removed in a future version. Use 
--command-config instead.")
                 .withRequiredArg()
                 .describedAs("config file")
                 .ofType(String.class);
+            commandConfig = parser
+                .accepts(
+                    "command-config",
+                    "Config properties file to pass to the admin client.")
+                .withRequiredArg()
+                .describedAs("Config file")
+                .ofType(String.class);
             pathToJsonFile = parser
                 .accepts(
                     "path-to-json-file",
@@ -322,6 +337,10 @@ public class LeaderElectionCommand {
             return options.valueOf(adminClientConfig);
         }
 
+        public String getCommandConfig() {
+            return options.valueOf(commandConfig);
+        }
+
         public String getTopic() {
             return options.valueOf(topic);
         }
@@ -344,6 +363,8 @@ public class LeaderElectionCommand {
                 throw new AdminCommandFailedException("Missing required 
option(s): " + String.join(", ", missingOptions));
             }
 
+            CommandLineUtils.checkInvalidArgs(parser, options, 
adminClientConfig, commandConfig);
+
             // One and only one is required: --topic, --all-topic-partitions 
or --path-to-json-file
             List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = List.of(
                 topic,
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java 
b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
index 7047b77c180..c6a6704ff78 100644
--- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -131,10 +131,15 @@ public class StreamsResetter {
             StreamsResetterOptions options = new StreamsResetterOptions(args);
 
             String groupId = options.applicationId();
-            Properties properties = new Properties();
-            if (options.hasCommandConfig()) {
-                properties.putAll(Utils.loadProps(options.commandConfig()));
+
+            String commandConfigFile;
+            if (options.hasConfig()) {
+                System.out.println("Option --config-file has been deprecated 
and will be removed in a future version. Use --command-config instead.");
+                commandConfigFile = options.config();
+            } else {
+                commandConfigFile = options.commandConfig();
             }
+            Properties properties = (commandConfigFile != null) ? 
Utils.loadProps(commandConfigFile) : new Properties();
 
             String bootstrapServerValue = "localhost:9092";
             if (options.hasBootstrapServer()) {
@@ -573,6 +578,8 @@ public class StreamsResetter {
         private final OptionSpec<String> fromFileOption;
         private final OptionSpec<Long> shiftByOption;
         private final OptionSpecBuilder dryRunOption;
+        @Deprecated(since = "4.2", forRemoval = true)
+        private final OptionSpec<String> configOption;
         private final OptionSpec<String> commandConfigOption;
         private final OptionSpecBuilder forceOption;
 
@@ -624,7 +631,12 @@ public class StreamsResetter {
                 .withRequiredArg()
                 .describedAs("number-of-offsets")
                 .ofType(Long.class);
-            commandConfigOption = parser.accepts("config-file", "Property file 
containing configs to be passed to admin clients and embedded consumer.")
+            configOption = parser.accepts("config-file", "(DEPRECATED) 
Property file containing configs to be passed to admin clients and embedded 
consumer. "
+                    + "This option will be removed in a future version. Use 
--command-config instead.")
+                .withRequiredArg()
+                .ofType(String.class)
+                .describedAs("file name");
+            commandConfigOption = parser.accepts("command-config", "Config 
properties file to be passed to admin clients and embedded consumer.")
                 .withRequiredArg()
                 .ofType(String.class)
                 .describedAs("file name");
@@ -648,6 +660,7 @@ public class StreamsResetter {
                 CommandLineUtils.checkInvalidArgs(parser, options, 
toLatestOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toEarliestOption, fromFileOption, shiftByOption);
                 CommandLineUtils.checkInvalidArgs(parser, options, 
fromFileOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toEarliestOption, toLatestOption, shiftByOption);
                 CommandLineUtils.checkInvalidArgs(parser, options, 
shiftByOption, toOffsetOption, toDatetimeOption, byDurationOption, 
toEarliestOption, toLatestOption, fromFileOption);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
configOption, commandConfigOption);
             } catch (final OptionException e) {
                 CommandLineUtils.printUsageAndExit(parser, e.getMessage());
             }
@@ -661,8 +674,12 @@ public class StreamsResetter {
             return options.valueOf(applicationIdOption);
         }
 
-        public boolean hasCommandConfig() {
-            return options.has(commandConfigOption);
+        public boolean hasConfig() {
+            return options.has(configOption);
+        }
+
+        public String config() {
+            return options.valueOf(configOption);
         }
 
         public String commandConfig() {
diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
index 201f8e56f4e..6207cc740fd 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
@@ -17,17 +17,27 @@
 package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.TestUtils;
+
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
 
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -120,6 +130,63 @@ public class ClusterToolTest {
         assertEquals("The option --include-fenced-brokers is only supported 
with --bootstrap-server option", exception.getMessage());
     }
 
+    @ClusterTest
+    public void testDeprecatedConfig(ClusterInstance clusterInstance) throws 
IOException {
+        File configFile = TestUtils.tempFile("client.id=my-client");
+
+        try (final MockedStatic<Admin> mockedAdmin = 
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
+            String output = ToolsTestUtils.captureStandardOut(() ->
+                assertDoesNotThrow(() -> ClusterTool.execute(
+                    "cluster-id",
+                    "--bootstrap-server", clusterInstance.bootstrapServers(),
+                    "--config", configFile.getAbsolutePath()
+                ))
+            );
+            assertTrue(output.contains("Option --config has been deprecated 
and will be removed in a future version. Use --command-config instead."));
+
+            ArgumentCaptor<Properties> argumentCaptor = 
ArgumentCaptor.forClass(Properties.class);
+            mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+            final Properties actualProps = argumentCaptor.getValue();
+            assertEquals("my-client", 
actualProps.get(AdminClientConfig.CLIENT_ID_CONFIG));
+        }
+    }
+
+    @ClusterTest
+    public void testCommandConfig(ClusterInstance clusterInstance) throws 
IOException {
+        File configFile = TestUtils.tempFile("client.id=my-client");
+
+        try (final MockedStatic<Admin> mockedAdmin = 
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
+            String output = ToolsTestUtils.captureStandardOut(() ->
+                assertDoesNotThrow(() -> ClusterTool.execute(
+                    "cluster-id",
+                    "--bootstrap-server", clusterInstance.bootstrapServers(),
+                    "--command-config", configFile.getAbsolutePath()
+                ))
+            );
+            assertTrue(output.contains("Cluster ID: " + 
clusterInstance.clusterId()));
+
+            ArgumentCaptor<Properties> argumentCaptor = 
ArgumentCaptor.forClass(Properties.class);
+            mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+            final Properties actualProps = argumentCaptor.getValue();
+            assertEquals("my-client", 
actualProps.get(AdminClientConfig.CLIENT_ID_CONFIG));
+        }
+    }
+
+    @ClusterTest
+    public void testCommandConfigAndDeprecatedConfigPresent(ClusterInstance 
clusterInstance) throws IOException {
+        File configFile = TestUtils.tempFile("client.id=my-client");
+
+        ArgumentParserException ex = 
assertThrows(ArgumentParserException.class, () ->
+            ClusterTool.execute(
+                "cluster-id",
+                "--bootstrap-server", clusterInstance.bootstrapServers(),
+                "--config", configFile.getAbsolutePath(),
+                "--command-config", configFile.getAbsolutePath()
+            )
+        );
+        assertEquals("--config and --command-config cannot be specified 
together.", ex.getMessage());
+    }
+
     @Test
     public void testPrintClusterId() throws Exception {
         Admin adminClient = new MockAdminClient.Builder().
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index c3623f24ad0..2e3ebf6a7bd 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.utils.Exit;
 
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockedStatic;
@@ -104,7 +105,32 @@ public class LeaderElectionCommandTest {
     }
 
     @ClusterTest
-    public void testAdminConfigCustomTimeouts() throws Exception {
+    public void testDeprecatedAdminConfig() throws Exception {
+        String defaultApiTimeoutMs = String.valueOf(110000);
+        String requestTimeoutMs = String.valueOf(55000);
+        Path adminConfigPath = tempAdminConfig(defaultApiTimeoutMs, 
requestTimeoutMs);
+
+        try (final MockedStatic<Admin> mockedAdmin = 
Mockito.mockStatic(Admin.class)) {
+            String output = ToolsTestUtils.captureStandardOut(() -> {
+                LeaderElectionCommand.mainNoExit(
+                    "--bootstrap-server", cluster.bootstrapServers(),
+                    "--election-type", "unclean", "--all-topic-partitions",
+                    "--admin.config", adminConfigPath.toString()
+                );
+            });
+            assertTrue(output.contains("Option --admin.config has been 
deprecated and will be removed in a future version. Use --command-config 
instead."));
+
+            ArgumentCaptor<Properties> argumentCaptor = 
ArgumentCaptor.forClass(Properties.class);
+            mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+            // verify that properties provided to admin client are the 
overridden properties
+            final Properties actualProps = argumentCaptor.getValue();
+            
assertEquals(actualProps.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), 
requestTimeoutMs);
+            
assertEquals(actualProps.get(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG), 
defaultApiTimeoutMs);
+        }
+    }
+
+    @ClusterTest
+    public void testCommandConfig() throws Exception {
         String defaultApiTimeoutMs = String.valueOf(110000);
         String requestTimeoutMs = String.valueOf(55000);
         Path adminConfigPath = tempAdminConfig(defaultApiTimeoutMs, 
requestTimeoutMs);
@@ -113,12 +139,11 @@ public class LeaderElectionCommandTest {
             assertEquals(1, LeaderElectionCommand.mainNoExit(
                 "--bootstrap-server", cluster.bootstrapServers(),
                 "--election-type", "unclean", "--all-topic-partitions",
-                "--admin.config", adminConfigPath.toString()
+                "--command-config", adminConfigPath.toString()
             ));
 
             ArgumentCaptor<Properties> argumentCaptor = 
ArgumentCaptor.forClass(Properties.class);
             mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
-
             // verify that properties provided to admin client are the 
overridden properties
             final Properties actualProps = argumentCaptor.getValue();
             
assertEquals(actualProps.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), 
requestTimeoutMs);
@@ -126,6 +151,32 @@ public class LeaderElectionCommandTest {
         }
     }
 
+    @ClusterTest
+    public void testCommandConfigAndDeprecatedConfigPresent() throws Exception 
{
+        String defaultApiTimeoutMs = String.valueOf(110000);
+        String requestTimeoutMs = String.valueOf(55000);
+        Path adminConfigPath = tempAdminConfig(defaultApiTimeoutMs, 
requestTimeoutMs);
+
+        try (final MockedStatic<Admin> mockedAdmin = 
Mockito.mockStatic(Admin.class)) {
+            // Mock Exit because CommandLineUtils.checkInvalidArgs calls exit
+            Exit.setExitProcedure(new ToolsTestUtils.MockExitProcedure());
+
+            String output = ToolsTestUtils.captureStandardErr(() -> {
+                LeaderElectionCommand.mainNoExit(
+                    "--bootstrap-server", "localhost:9092",
+                    "--election-type", "unclean", "--all-topic-partitions",
+                    "--admin.config", adminConfigPath.toString(),
+                    "--command-config", adminConfigPath.toString()
+                );
+            });
+
+            assertTrue(output.contains(String.format("Option \"%s\" can't be 
used with option \"%s\"",
+                "[admin.config]", "[command-config]")));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
     @ClusterTest
     public void testTopicPartition() throws InterruptedException, 
ExecutionException {
         String topic = "unclean-topic";
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
index b8b9f312e8c..9b881fc8ece 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
@@ -16,8 +16,11 @@
  */
 package org.apache.kafka.tools;
 
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.network.SocketServerConfigs;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -34,6 +37,9 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -238,6 +244,81 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         assertEquals(1, exitCode);
     }
 
+    @Test
+    public void testDeprecatedConfig(final TestInfo testInfo) throws 
IOException {
+        File configFile = TestUtils.tempFile("client.id=my-client");
+
+        final String appID = safeUniqueTestName(testInfo);
+        final String[] parameters = new String[] {
+            "--application-id", appID,
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--internal-topics", INPUT_TOPIC,
+            "--config-file", configFile.getAbsolutePath()
+        };
+
+        try (final MockedStatic<Admin> mockedAdmin = 
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
+            String output = ToolsTestUtils.captureStandardOut(() -> {
+                new StreamsResetter().execute(parameters);
+            });
+            assertTrue(output.contains("Option --config-file has been 
deprecated and will be removed in a future version. Use --command-config 
instead."));
+
+            ArgumentCaptor<Properties> argumentCaptor = 
ArgumentCaptor.forClass(Properties.class);
+            mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+            final Properties actualProps = argumentCaptor.getValue();
+            assertEquals("my-client", 
actualProps.get(AdminClientConfig.CLIENT_ID_CONFIG));
+        }
+    }
+
+    @Test
+    public void testCommandConfig(final TestInfo testInfo) throws IOException {
+        File configFile = TestUtils.tempFile("client.id=my-client");
+
+        final String appID = safeUniqueTestName(testInfo);
+        final String[] parameters = new String[] {
+            "--application-id", appID,
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--internal-topics", INPUT_TOPIC,
+            "--command-config", configFile.getAbsolutePath()
+        };
+
+        try (final MockedStatic<Admin> mockedAdmin = 
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
+            new StreamsResetter().execute(parameters);
+
+            ArgumentCaptor<Properties> argumentCaptor = 
ArgumentCaptor.forClass(Properties.class);
+            mockedAdmin.verify(() -> Admin.create(argumentCaptor.capture()));
+            final Properties actualProps = argumentCaptor.getValue();
+            assertEquals("my-client", 
actualProps.get(AdminClientConfig.CLIENT_ID_CONFIG));
+        }
+    }
+
+    @Test
+    public void testCommandConfigAndDeprecatedConfigPresent(final TestInfo 
testInfo) throws IOException {
+        File configFile = TestUtils.tempFile("client.id=my-client");
+
+        final String appID = safeUniqueTestName(testInfo);
+        final String[] parameters = new String[] {
+            "--application-id", appID,
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--internal-topics", INPUT_TOPIC,
+            "--config-file", configFile.getAbsolutePath(),
+            "--command-config", configFile.getAbsolutePath()
+        };
+
+        try (final MockedStatic<Admin> mockedAdmin = 
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
+            // Mock Exit because CommandLineUtils.checkInvalidArgs calls exit
+            Exit.setExitProcedure(new ToolsTestUtils.MockExitProcedure());
+
+            String output = ToolsTestUtils.captureStandardErr(() -> {
+                new StreamsResetter().execute(parameters);
+            });
+
+            assertTrue(output.contains(String.format("Option \"%s\" can't be 
used with option \"%s\"",
+                "[config-file]", "[command-config]")));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
     @Test
     public void testResetWhenLongSessionTimeoutConfiguredWithForceOption(final 
TestInfo testInfo) throws Exception {
         final String appID = safeUniqueTestName(testInfo);

Reply via email to