This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3bc50f937cb KAFKA-19623: Implement KIP-1147 for console 
producer/consumer/share-consumer. (#20479)
3bc50f937cb is described below

commit 3bc50f937cb3acd5b814587da5518d8fb5e5ec21
Author: Shivsundar R <[email protected]>
AuthorDate: Wed Sep 17 10:28:20 2025 -0400

    KAFKA-19623: Implement KIP-1147 for console 
producer/consumer/share-consumer. (#20479)
    
    *What*
    https://issues.apache.org/jira/browse/KAFKA-19623
    
    - The PR implements KIP-1147
    
    
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1147%3A+Improve+consistency+of+command-line+arguments)
    for the console tools i.e. `ConsoleProducer`, `ConsoleConsumer` and
    `ConsoleShareConsumer`.
    
    - Currently the previous names for the options are still usable but
    there will be warning message stating those are deprecated and will be
    removed in a future version.
    - I have added unit tests and also manually verified using the console
    tools that things are working as expected.
    
    Reviewers: Andrew Schofield <[email protected]>, Jhen-Yung Hsu
     <[email protected]>, Jimmy Wang
     <[email protected]>
---
 docker/examples/README.md                          |   8 +-
 docker/test/docker_sanity_test.py                  |  16 +-
 docs/security.html                                 |   4 +-
 .../KStreamAggregationIntegrationTest.java         |  14 +-
 tests/kafkatest/services/console_consumer.py       |  19 +-
 tests/kafkatest/services/console_share_consumer.py |  21 +-
 tests/kafkatest/version.py                         |  14 +
 .../org/apache/kafka/tools/ConsoleProducer.java    |  40 ++-
 .../tools/consumer/ConsoleConsumerOptions.java     |  71 +++-
 .../consumer/ConsoleShareConsumerOptions.java      |  74 +++-
 .../apache/kafka/tools/ConsoleProducerTest.java    | 113 ++++++-
 .../tools/consumer/ConsoleConsumerOptionsTest.java | 364 ++++++++++++++++----
 .../consumer/ConsoleShareConsumerOptionsTest.java  | 375 +++++++++++++++++----
 13 files changed, 938 insertions(+), 195 deletions(-)

diff --git a/docker/examples/README.md b/docker/examples/README.md
index e76247bdb54..162e27c711a 100644
--- a/docker/examples/README.md
+++ b/docker/examples/README.md
@@ -147,7 +147,7 @@ Single Node
     - To produce messages using client scripts (Ensure that java version >= 
17):
     ```
     # Run from root of the repo
-    $ bin/kafka-console-producer.sh --topic test --bootstrap-server 
localhost:9093 --producer.config 
./docker/examples/fixtures/client-secrets/client-ssl.properties
+    $ bin/kafka-console-producer.sh --topic test --bootstrap-server 
localhost:9093 --command-config 
./docker/examples/fixtures/client-secrets/client-ssl.properties
     ```
 - File Input:
     - Here ssl configs are provided via file input.
@@ -167,7 +167,7 @@ Single Node
     - To produce messages using client scripts (Ensure that java version >= 
17):
     ```
     # Run from root of the repo
-    $ bin/kafka-console-producer.sh --topic test --bootstrap-server 
localhost:9093 --producer.config 
./docker/examples/fixtures/client-secrets/client-ssl.properties
+    $ bin/kafka-console-producer.sh --topic test --bootstrap-server 
localhost:9093 --command-config 
./docker/examples/fixtures/client-secrets/client-ssl.properties
     ```
 
 Multi Node Cluster
@@ -219,7 +219,7 @@ Multi Node Cluster
         - To produce messages using client scripts (Ensure that java version 
>= 17):
         ```
         # Run from root of the repo
-        $ bin/kafka-console-producer.sh --topic test --bootstrap-server 
localhost:29093 --producer.config 
./docker/examples/fixtures/client-secrets/client-ssl.properties
+        $ bin/kafka-console-producer.sh --topic test --bootstrap-server 
localhost:29093 --command-config 
./docker/examples/fixtures/client-secrets/client-ssl.properties
         ```
 - Isolated:
     - Examples are present in `docker-compose-files/cluster/isolated` 
directory.
@@ -258,7 +258,7 @@ Multi Node Cluster
         - To produce messages using client scripts (Ensure that java version 
>= 17):
         ```
         # Run from root of the repo
-        $ bin/kafka-console-producer.sh --topic test --bootstrap-server 
localhost:29093 --producer.config 
./docker/examples/fixtures/client-secrets/client-ssl.properties
+        $ bin/kafka-console-producer.sh --topic test --bootstrap-server 
localhost:29093 --command-config 
./docker/examples/fixtures/client-secrets/client-ssl.properties
         ```
 
 - Note that the examples are meant to be tried one at a time, make sure you 
close an example server before trying out the other to avoid conflicts.
diff --git a/docker/test/docker_sanity_test.py 
b/docker/test/docker_sanity_test.py
index d2135fb0295..0d21bf47fee 100644
--- a/docker/test/docker_sanity_test.py
+++ b/docker/test/docker_sanity_test.py
@@ -65,7 +65,7 @@ class DockerSanityTest(unittest.TestCase):
         subprocess.run(["bash", "-c", " ".join(command)])
     
     def consume_message(self, topic, consumer_config):
-        command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_CONSUMER}", 
"--topic", topic, "--property", "'print.key=true'", "--property", 
"'key.separator=:'", "--from-beginning", "--max-messages", "1", "--timeout-ms", 
f"{constants.CLIENT_TIMEOUT}"]
+        command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_CONSUMER}", 
"--topic", topic, "--formatter-property", "'print.key=true'", 
"--formatter-property", "'key.separator=:'", "--from-beginning", 
"--max-messages", "1", "--timeout-ms", f"{constants.CLIENT_TIMEOUT}"]
         command.extend(consumer_config)
         message = subprocess.check_output(["bash", "-c", " ".join(command)])
         return message.decode("utf-8").strip()
@@ -93,9 +93,9 @@ class DockerSanityTest(unittest.TestCase):
             errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
             return errors
 
-        producer_config = ["--bootstrap-server", "localhost:9092", 
"--property", "client.id=host"]
+        producer_config = ["--bootstrap-server", "localhost:9092", 
"--command-property", "client.id=host"]
         self.produce_message(constants.BROKER_METRICS_TEST_TOPIC, 
producer_config, "key", "message")
-        consumer_config = ["--bootstrap-server", "localhost:9092", 
"--property", "auto.offset.reset=earliest"]
+        consumer_config = ["--bootstrap-server", "localhost:9092", 
"--command-property", "auto.offset.reset=earliest"]
         message = self.consume_message(constants.BROKER_METRICS_TEST_TOPIC, 
consumer_config)
         try:
             self.assertEqual(message, "key:message")
@@ -129,13 +129,13 @@ class DockerSanityTest(unittest.TestCase):
             return errors
 
         producer_config = ["--bootstrap-server", ssl_broker_port,
-                           "--producer.config", 
f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"]
+                           "--command-config", 
f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"]
         self.produce_message(topic, producer_config, "key", "message")
 
         consumer_config = [
             "--bootstrap-server", ssl_broker_port,
-            "--property", "auto.offset.reset=earliest",
-            "--consumer.config", 
f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}",
+            "--command-property", "auto.offset.reset=earliest",
+            "--command-config", 
f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}",
         ]
         message = self.consume_message(topic, consumer_config)
         try:
@@ -155,7 +155,7 @@ class DockerSanityTest(unittest.TestCase):
             errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e))
             return errors
         
-        producer_config = ["--bootstrap-server", "localhost:9092", 
"--property", "client.id=host"]
+        producer_config = ["--bootstrap-server", "localhost:9092", 
"--command-property", "client.id=host"]
         self.produce_message(constants.BROKER_RESTART_TEST_TOPIC, 
producer_config, "key", "message")
 
         print("Stopping Container")
@@ -163,7 +163,7 @@ class DockerSanityTest(unittest.TestCase):
         print("Resuming Container")
         self.resume_container()
 
-        consumer_config = ["--bootstrap-server", "localhost:9092", 
"--property", "auto.offset.reset=earliest"]
+        consumer_config = ["--bootstrap-server", "localhost:9092", 
"--command-property", "auto.offset.reset=earliest"]
         message = self.consume_message(constants.BROKER_RESTART_TEST_TOPIC, 
consumer_config)
         try:
             self.assertEqual(message, "key:message")
diff --git a/docs/security.html b/docs/security.html
index 5940fc3cda6..9364a05e40a 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -510,8 +510,8 @@ ssl.key.password=test1234</code></pre>
             </ol>
             <br>
             Examples using console-producer and console-consumer:
-            <pre><code class="language-bash">$ bin/kafka-console-producer.sh 
--bootstrap-server localhost:9093 --topic test --producer.config 
client-ssl.properties
-$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test 
--consumer.config client-ssl.properties</code></pre>
+            <pre><code class="language-bash">$ bin/kafka-console-producer.sh 
--bootstrap-server localhost:9093 --topic test --command-config 
client-ssl.properties
+$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test 
--command-config client-ssl.properties</code></pre>
         </li>
     </ol>
     <h3 class="anchor-heading"><a id="security_sasl" 
class="anchor-link"></a><a href="#security_sasl">7.4 Authentication using 
SASL</a></h3>
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 7037ce1368d..aa283726134 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -1122,15 +1122,15 @@ public class KStreamAggregationIntegrationTest {
             final String[] args = new String[] {
                 "--bootstrap-server", CLUSTER.bootstrapServers(),
                 "--from-beginning",
-                "--property", "print.key=true",
-                "--property", "print.timestamp=" + printTimestamp,
+                "--formatter-property", "print.key=true",
+                "--formatter-property", "print.timestamp=" + printTimestamp,
                 "--topic", outputTopic,
                 "--max-messages", String.valueOf(numMessages),
-                "--property", "key.deserializer=" + 
keyDeserializer.getClass().getName(),
-                "--property", "value.deserializer=" + 
valueDeserializer.getClass().getName(),
-                "--property", "key.separator=" + keySeparator,
-                "--property", "key.deserializer." + 
TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" + 
Serdes.serdeFrom(innerClass).getClass().getName(),
-                "--property", "key.deserializer.window.size.ms=500",
+                "--formatter-property", "key.deserializer=" + 
keyDeserializer.getClass().getName(),
+                "--formatter-property", "value.deserializer=" + 
valueDeserializer.getClass().getName(),
+                "--formatter-property", "key.separator=" + keySeparator,
+                "--formatter-property", "key.deserializer." + 
TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" + 
Serdes.serdeFrom(innerClass).getClass().getName(),
+                "--formatter-property", "key.deserializer.window.size.ms=500",
             };
 
             ConsoleConsumer.run(new ConsoleConsumerOptions(args));
diff --git a/tests/kafkatest/services/console_consumer.py 
b/tests/kafkatest/services/console_consumer.py
index 9755faa1969..0f95a0f7c79 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -88,7 +88,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
             jaas_override_variables     A dict of variables to be used in the 
jaas.conf template file
             kafka_opts_override         Override parameters of the KAFKA_OPTS 
environment variable
             client_prop_file_override   Override client.properties file used 
by the consumer
-            consumer_properties         A dict of values to pass in as 
--consumer-property key=value
+            consumer_properties         A dict of values to pass in as 
--command-property key=value. For versions older than KAFKA_4_2_0, these will 
be passed as --consumer-property key=value
         """
         JmxMixin.__init__(self, num_nodes=num_nodes, 
jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
                           root=ConsoleConsumer.PERSISTENT_ROOT)
@@ -163,8 +163,11 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
               "export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j_config)s\"; " \
               "export KAFKA_OPTS=%(kafka_opts)s; " \
               "%(console_consumer)s " \
-              "--topic %(topic)s " \
-              "--consumer.config %(config_file)s " % args
+              "--topic %(topic)s " % args
+
+        version = get_version(node)
+        command_config_arg = "--command-config" if 
version.supports_command_config() else "--consumer.config"
+        cmd += "%s %s" % (command_config_arg, args['config_file'])
         cmd += " --bootstrap-server %(broker_list)s" % args
         cmd += " --isolation-level %s" % self.isolation_level
 
@@ -176,14 +179,15 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
             # This will be added in the properties file instead
             cmd += " --timeout-ms %s" % self.consumer_timeout_ms
 
+        formatter_property_arg = "--formatter-property" if 
version.supports_formatter_property else "--property"
         if self.print_timestamp:
-            cmd += " --property print.timestamp=true"
+            cmd += " %s print.timestamp=true" % formatter_property_arg
 
         if self.print_key:
-            cmd += " --property print.key=true"
+            cmd += " %s print.key=true" % formatter_property_arg
 
         if self.print_partition:
-            cmd += " --property print.partition=true"
+            cmd += " %s print.partition=true" % formatter_property_arg
 
         # LoggingMessageFormatter was introduced after 0.9
         if node.version > LATEST_3_7:
@@ -194,9 +198,10 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
         if self.enable_systest_events:
             cmd += " --enable-systest-events"
 
+        command_property_arg = "--command-property" if 
version.supports_command_property() else "--consumer-property"
         if self.consumer_properties is not None:
             for k, v in self.consumer_properties.items():
-                cmd += " --consumer-property %s=%s" % (k, v)
+                cmd += " %s %s=%s" % (command_property_arg, k, v)
 
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd
diff --git a/tests/kafkatest/services/console_share_consumer.py 
b/tests/kafkatest/services/console_share_consumer.py
index 03fbaeaf5a5..2d7da50fe30 100644
--- a/tests/kafkatest/services/console_share_consumer.py
+++ b/tests/kafkatest/services/console_share_consumer.py
@@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.version import DEV_BRANCH, LATEST_4_1
+from kafkatest.version import DEV_BRANCH, LATEST_4_1, get_version
 from kafkatest.services.kafka.util import fix_opts_for_new_jvm, 
get_log4j_config_param, get_log4j_config_for_tools
 
 """
@@ -84,7 +84,7 @@ class ConsoleShareConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadSer
             jaas_override_variables     A dict of variables to be used in the 
jaas.conf template file
             kafka_opts_override         Override parameters of the KAFKA_OPTS 
environment variable
             client_prop_file_override   Override client.properties file used 
by the consumer
-            share_consumer_properties   A dict of values to pass in as 
--consumer-property key=value
+            share_consumer_properties   A dict of values to pass in as 
--command-property key=value. For versions older than KAFKA_4_2_0, these will 
be passed as --consumer-property key=value
         """
         JmxMixin.__init__(self, num_nodes=num_nodes, 
jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
                           root=ConsoleShareConsumer.PERSISTENT_ROOT)
@@ -156,31 +156,36 @@ class ConsoleShareConsumer(KafkaPathResolverMixin, 
JmxMixin, BackgroundThreadSer
               "export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j_config)s\"; " \
               "export KAFKA_OPTS=%(kafka_opts)s; " \
               "%(console_share_consumer)s " \
-              "--topic %(topic)s " \
-              "--consumer-config %(config_file)s " % args
+              "--topic %(topic)s " % args
+
+        version = get_version(node)
+        command_config_arg = "--command-config" if 
version.supports_command_config() else "--consumer-config"
+        cmd += "%s %s" % (command_config_arg, args['config_file'])
         cmd += " --bootstrap-server %(broker_list)s" % args
 
         if self.share_consumer_timeout_ms is not None:
             # This will be added in the properties file instead
             cmd += " --timeout-ms %s" % self.share_consumer_timeout_ms
 
+        formatter_property_arg = "--formatter-property" if 
version.supports_formatter_property else "--property"
         if self.print_timestamp:
-            cmd += " --property print.timestamp=true"
+            cmd += " %s print.timestamp=true" % formatter_property_arg
 
         if self.print_key:
-            cmd += " --property print.key=true"
+            cmd += " %s print.key=true" % formatter_property_arg
 
         if self.print_partition:
-            cmd += " --property print.partition=true"
+            cmd += " %s print.partition=true" % formatter_property_arg
 
         cmd += " --formatter 
org.apache.kafka.tools.consumer.LoggingMessageFormatter"
 
         if self.enable_systest_events:
             cmd += " --enable-systest-events"
 
+        command_property_arg = "--command-property" if 
version.supports_command_property() else "--consumer-property"
         if self.share_consumer_properties is not None:
             for k, v in self.share_consumer_properties.items():
-                cmd += " --consumer-property %s=%s" % (k, v)
+                cmd += " %s %s=%s" % (command_property_arg, k, v)
 
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index e4d600ac403..8114d51861e 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -104,6 +104,20 @@ class KafkaVersion(LooseVersion):
         # - For older versions, continue using --producer.config or 
--consumer.config
         return self >= V_4_2_0
 
+    def supports_command_property(self):
+        # According to KIP-1147, --producer-property and --consumer-property 
have been deprecated and will be removed in future versions
+        # For backward compatibility, we select the configuration based on 
node version:
+        # - For versions 4.2.0 and above, use --command-property
+        # - For older versions, continue using --producer-property or 
--consumer-property
+        return self >= V_4_2_0
+
+    def supports_formatter_property(self):
+        # According to KIP-1147, --property has been deprecated and will be 
removed in future versions
+        # For backward compatibility, we select the configuration based on 
node version:
+        # - For versions 4.2.0 and above, use --formatter-property
+        # - For older versions, continue using --property
+        return self >= V_4_2_0
+
 def get_version(node=None):
     """Return the version attached to the given node.
     Default to DEV_BRANCH if node or node.version is undefined (aka None)
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java 
b/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
index 93e81bd5a43..3e9e34f4210 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
@@ -131,8 +131,12 @@ public class ConsoleProducer {
         private final OptionSpec<Integer> socketBufferSizeOpt;
         private final OptionSpec<String> propertyOpt;
         private final OptionSpec<String> readerConfigOpt;
+        @Deprecated(since = "4.2", forRemoval = true)
         private final OptionSpec<String> producerPropertyOpt;
+        private OptionSpec<String> commandPropertyOpt;
+        @Deprecated(since = "4.2", forRemoval = true)
         private final OptionSpec<String> producerConfigOpt;
+        private OptionSpec<String> commandConfigOpt;
 
         public ConsoleProducerOptions(String[] args) {
             super(args);
@@ -250,11 +254,20 @@ public class ConsoleProducer {
                     .withRequiredArg()
                     .describedAs("config file")
                     .ofType(String.class);
-            producerPropertyOpt = parser.accepts("producer-property", "A 
mechanism to pass user-defined properties in the form key=value to the 
producer. ")
+            producerPropertyOpt = parser.accepts("producer-property", 
"(DEPRECATED) A mechanism to pass user-defined properties in the form key=value 
to the producer." +
+                            "This option will be removed in a future version. 
Use --command-property instead.")
                     .withRequiredArg()
                     .describedAs("producer_prop")
                     .ofType(String.class);
-            producerConfigOpt = parser.accepts("producer.config", "Producer 
config properties file. Note that " + producerPropertyOpt + " takes precedence 
over this config.")
+            commandPropertyOpt = parser.accepts("command-property", "A 
mechanism to pass user-defined properties in the form key=value to the 
producer.")
+                    .withRequiredArg()
+                    .describedAs("producer_prop")
+                    .ofType(String.class);
+            producerConfigOpt = parser.accepts("producer.config", 
"(DEPRECATED) Producer config properties file. Note that " + commandPropertyOpt 
+ " takes precedence over this config. This option will be removed in a future 
version. Use --command-config instead.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            commandConfigOpt = parser.accepts("command-config", "Producer 
config properties file. Note that " + commandPropertyOpt + " takes precedence 
over this config.")
                     .withRequiredArg()
                     .describedAs("config file")
                     .ofType(String.class);
@@ -273,6 +286,23 @@ public class ConsoleProducer {
 
             CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);
 
+            if (options.has(commandConfigOpt) && 
options.has(producerConfigOpt)) {
+                CommandLineUtils.printUsageAndExit(parser, "Options 
--command-config and --producer.config cannot be specified together.");
+            }
+            if (options.has(commandPropertyOpt) && 
options.has(producerPropertyOpt)) {
+                CommandLineUtils.printUsageAndExit(parser, "Options 
--command-property and --producer-property cannot be specified together.");
+            }
+
+            if (options.has(producerPropertyOpt)) {
+                System.out.println("Warning: --producer-property is deprecated 
and will be removed in a future version. Use --command-property instead.");
+                commandPropertyOpt = producerPropertyOpt;
+            }
+
+            if (options.has(producerConfigOpt)) {
+                System.out.println("Warning: --producer.config is deprecated 
and will be removed in a future version. Use --command-config instead.");
+                commandConfigOpt = producerConfigOpt;
+            }
+
             try {
                 
ToolsUtils.validateBootstrapServer(options.valueOf(bootstrapServerOpt));
             } catch (IllegalArgumentException e) {
@@ -314,11 +344,11 @@ public class ConsoleProducer {
         Properties producerProps() throws IOException {
             Properties props = new Properties();
 
-            if (options.has(producerConfigOpt)) {
-                props.putAll(loadProps(options.valueOf(producerConfigOpt)));
+            if (options.has(commandConfigOpt)) {
+                props.putAll(loadProps(options.valueOf(commandConfigOpt)));
             }
 
-            
props.putAll(parseKeyValueArgs(options.valuesOf(producerPropertyOpt)));
+            
props.putAll(parseKeyValueArgs(options.valuesOf(commandPropertyOpt)));
             props.put(BOOTSTRAP_SERVERS_CONFIG, 
options.valueOf(bootstrapServerOpt));
             props.put(COMPRESSION_TYPE_CONFIG, compressionCodec());
 
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
index fe33bfe6c68..abe6322fd97 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
@@ -48,7 +48,9 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
     private final OptionSpec<Integer> partitionIdOpt;
     private final OptionSpec<String> offsetOpt;
     private final OptionSpec<String> messageFormatterOpt;
-    private final OptionSpec<String> messageFormatterArgOpt;
+    @Deprecated(since = "4.2", forRemoval = true)
+    private final OptionSpec<String> messageFormatterArgOptDeprecated;
+    private OptionSpec<String> messageFormatterArgOpt;
     private final OptionSpec<String> messageFormatterConfigOpt;
     private final OptionSpec<?> resetBeginningOpt;
     private final OptionSpec<Integer> maxMessagesOpt;
@@ -66,6 +68,7 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
     private final long timeoutMs;
     private final MessageFormatter formatter;
 
+    @SuppressWarnings("deprecation")
     public ConsoleConsumerOptions(String[] args) throws IOException {
         super(args);
         topicOpt = parser.accepts("topic", "The topic to consume on.")
@@ -87,11 +90,23 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
                 .describedAs("consume offset")
                 .ofType(String.class)
                 .defaultsTo("latest");
-        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "A mechanism to pass user-defined 
properties in the form key=value to the consumer.")
+        @Deprecated(since = "4.2", forRemoval = true)
+        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass 
user-defined properties in the form key=value to the consumer. " +
+                        "This option will be removed in a future version. Use 
--command-property instead.")
                 .withRequiredArg()
                 .describedAs("consumer_prop")
                 .ofType(String.class);
-        OptionSpec<String> consumerConfigOpt = 
parser.accepts("consumer.config", "Consumer config properties file. Note that " 
+ consumerPropertyOpt + " takes precedence over this config.")
+        OptionSpec<String> commandPropertyOpt = 
parser.accepts("command-property", "A mechanism to pass user-defined properties 
in the form key=value to the consumer.")
+                .withRequiredArg()
+                .describedAs("consumer_prop")
+                .ofType(String.class);
+        @Deprecated(since = "4.2", forRemoval = true) 
+        OptionSpec<String> consumerConfigOpt = 
parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties 
file. Note that " + commandPropertyOpt + " takes precedence over this config. " 
+
+                        "This option will be removed in a future version. Use 
--command-config instead.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+        OptionSpec<String> commandConfigOpt = parser.accepts("command-config", 
"Consumer config properties file. Note that " + commandPropertyOpt + " takes 
precedence over this config.")
                 .withRequiredArg()
                 .describedAs("config file")
                 .ofType(String.class);
@@ -100,7 +115,28 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
                 .describedAs("class")
                 .ofType(String.class)
                 .defaultsTo(DefaultMessageFormatter.class.getName());
-        messageFormatterArgOpt = parser.accepts("property",
+        messageFormatterArgOptDeprecated = parser.accepts("property",
+                        "(DEPRECATED) The properties to initialize the message 
formatter. Default properties include: \n" +
+                            " print.timestamp=true|false\n" +
+                            " print.key=true|false\n" +
+                            " print.offset=true|false\n" +
+                            " print.epoch=true|false\n" +
+                            " print.partition=true|false\n" +
+                            " print.headers=true|false\n" +
+                            " print.value=true|false\n" +
+                            " key.separator=<key.separator>\n" +
+                            " line.separator=<line.separator>\n" +
+                            " headers.separator=<line.separator>\n" +
+                            " null.literal=<null.literal>\n" +
+                            " key.deserializer=<key.deserializer>\n" +
+                            " value.deserializer=<value.deserializer>\n" +
+                            " header.deserializer=<header.deserializer>\n" +
+                            "\nUsers can also pass in customized properties 
for their formatter; more specifically, users can pass in properties keyed with 
'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes 
to configure their deserializers. " +
+                                "\nThis option will be removed in a future 
version. Use --formatter-property instead.")
+                .withRequiredArg()
+                .describedAs("prop")
+                .ofType(String.class);
+        messageFormatterArgOpt = parser.accepts("formatter-property",
                         "The properties to initialize the message formatter. 
Default properties include: \n" +
                             " print.timestamp=true|false\n" +
                             " print.key=true|false\n" +
@@ -170,11 +206,25 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
         CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
read data from Kafka topics and outputs it to standard output.");
 
         checkRequiredArgs();
+        if (options.has(consumerPropertyOpt) && 
options.has(commandPropertyOpt)) {
+            CommandLineUtils.printUsageAndExit(parser, "Options 
--consumer-property and --command-property cannot be specified together.");
+        }
+        if (options.has(consumerConfigOpt) && options.has(commandConfigOpt)) {
+            CommandLineUtils.printUsageAndExit(parser, "Options 
--consumer.config and --command-config cannot be specified together.");
+        }
 
-        Properties consumerPropsFromFile = options.has(consumerConfigOpt)
-                ? Utils.loadProps(options.valueOf(consumerConfigOpt))
+        if (options.has(consumerPropertyOpt)) {
+            System.out.println("Option --consumer-property is deprecated and 
will be removed in a future version. Use --command-property instead.");
+            commandPropertyOpt = consumerPropertyOpt;
+        }
+        if (options.has(consumerConfigOpt)) {
+            System.out.println("Option --consumer.config is deprecated and 
will be removed in a future version. Use --command-config instead.");
+            commandConfigOpt = consumerConfigOpt;
+        }
+        Properties consumerPropsFromFile = options.has(commandConfigOpt)
+                ? Utils.loadProps(options.valueOf(commandConfigOpt))
                 : new Properties();
-        Properties extraConsumerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt));
+        Properties extraConsumerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(commandPropertyOpt));
         Set<String> groupIdsProvided = 
checkConsumerGroup(consumerPropsFromFile, extraConsumerProps);
         consumerProps = buildConsumerProps(consumerPropsFromFile, 
extraConsumerProps, groupIdsProvided);
         offset = parseOffset();
@@ -323,6 +373,13 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
             Class<?> messageFormatterClass = 
Class.forName(options.valueOf(messageFormatterOpt));
             formatter = (MessageFormatter) 
messageFormatterClass.getDeclaredConstructor().newInstance();
 
+            if (options.has(messageFormatterArgOpt) && 
options.has(messageFormatterArgOptDeprecated)) {
+                CommandLineUtils.printUsageAndExit(parser, "Options --property 
and --formatter-property cannot be specified together.");
+            }
+            if (options.has(messageFormatterArgOptDeprecated)) {
+                System.out.println("Option --property is deprecated and will 
be removed in a future version. Use --formatter-property instead.");
+                messageFormatterArgOpt = messageFormatterArgOptDeprecated;
+            }
             Properties formatterArgs = formatterArgs();
             Map<String, String> formatterConfigs = new HashMap<>();
             for (final String name : formatterArgs.stringPropertyNames()) {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java
index f00407b25eb..3472d07afa5 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java
@@ -37,7 +37,9 @@ import joptsimple.OptionSpec;
 public final class ConsoleShareConsumerOptions extends CommandDefaultOptions {
     private final OptionSpec<String> messageFormatterOpt;
     private final OptionSpec<String> messageFormatterConfigOpt;
-    private final OptionSpec<String> messageFormatterArgOpt;
+    @Deprecated(since = "4.2", forRemoval = true)
+    private final OptionSpec<String> messageFormatterArgOptDeprecated;
+    private OptionSpec<String> messageFormatterArgOpt;
     private final OptionSpec<String> keyDeserializerOpt;
     private final OptionSpec<String> valueDeserializerOpt;
     private final OptionSpec<Integer> maxMessagesOpt;
@@ -52,17 +54,30 @@ public final class ConsoleShareConsumerOptions extends 
CommandDefaultOptions {
     private final MessageFormatter formatter;
     private final OptionSpec<?> enableSystestEventsLoggingOpt;
 
+    @SuppressWarnings("deprecation")
     public ConsoleShareConsumerOptions(String[] args) throws IOException {
         super(args);
         topicOpt = parser.accepts("topic", "The topic to consume from.")
                 .withRequiredArg()
                 .describedAs("topic")
                 .ofType(String.class);
-        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "A mechanism to pass user-defined 
properties in the form key=value to the consumer.")
+        @Deprecated(since = "4.2", forRemoval = true)
+        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass 
user-defined properties in the form key=value to the consumer. " +
+                        "This option will be removed in a future version. Use 
--command-property instead.")
                 .withRequiredArg()
                 .describedAs("consumer_prop")
                 .ofType(String.class);
-        OptionSpec<String> consumerConfigOpt = 
parser.accepts("consumer-config", "Consumer config properties file. Note that " 
+ consumerPropertyOpt + " takes precedence over this config.")
+        OptionSpec<String> commandPropertyOpt = 
parser.accepts("command-property", "A mechanism to pass user-defined properties 
in the form key=value to the consumer.")
+                .withRequiredArg()
+                .describedAs("consumer_prop")
+                .ofType(String.class);
+        @Deprecated(since = "4.2", forRemoval = true)
+        OptionSpec<String> consumerConfigOpt = 
parser.accepts("consumer-config", "(DEPRECATED) Consumer config properties 
file. Note that " + commandPropertyOpt + " takes precedence over this config. " 
+
+                        "This option will be removed in a future version. Use 
--command-config instead.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+        OptionSpec<String> commandConfigOpt = parser.accepts("command-config", 
"Consumer config properties file. Note that " + commandPropertyOpt + " takes 
precedence over this config.")
                 .withRequiredArg()
                 .describedAs("config file")
                 .ofType(String.class);
@@ -71,7 +86,29 @@ public final class ConsoleShareConsumerOptions extends 
CommandDefaultOptions {
                 .describedAs("class")
                 .ofType(String.class)
                 .defaultsTo(DefaultMessageFormatter.class.getName());
-        messageFormatterArgOpt = parser.accepts("property",
+        messageFormatterArgOptDeprecated = parser.accepts("property",
+                        "(DEPRECATED) The properties to initialize the message 
formatter. Default properties include: \n" +
+                                " print.timestamp=true|false\n" +
+                                " print.key=true|false\n" +
+                                " print.offset=true|false\n" +
+                                " print.delivery=true|false\n" +
+                                " print.epoch=true|false\n" +
+                                " print.partition=true|false\n" +
+                                " print.headers=true|false\n" +
+                                " print.value=true|false\n" +
+                                " key.separator=<key.separator>\n" +
+                                " line.separator=<line.separator>\n" +
+                                " headers.separator=<line.separator>\n" +
+                                " null.literal=<null.literal>\n" +
+                                " key.deserializer=<key.deserializer>\n" +
+                                " value.deserializer=<value.deserializer>\n" +
+                                " header.deserializer=<header.deserializer>\n" 
+
+                                "\nUsers can also pass in customized 
properties for their formatter; more specifically, users can pass in properties 
keyed with 'key.deserializer.', 'value.deserializer.' and 
'headers.deserializer.' prefixes to configure their deserializers. " +
+                                "\nThis option will be removed in a future 
version. Use --formatter-property instead.")
+                .withRequiredArg()
+                .describedAs("prop")
+                .ofType(String.class);
+        messageFormatterArgOpt = parser.accepts("formatter-property",
                         "The properties to initialize the message formatter. 
Default properties include: \n" +
                                 " print.timestamp=true|false\n" +
                                 " print.key=true|false\n" +
@@ -141,10 +178,26 @@ public final class ConsoleShareConsumerOptions extends 
CommandDefaultOptions {
             CommandLineUtils.printUsageAndExit(parser, "At most one of 
--reject and --release may be specified.");
         }
 
-        Properties consumerPropsFromFile = options.has(consumerConfigOpt)
-                ? Utils.loadProps(options.valueOf(consumerConfigOpt))
+        if (options.has(consumerPropertyOpt) && 
options.has(commandPropertyOpt)) {
+            CommandLineUtils.printUsageAndExit(parser, "Options 
--consumer-property and --command-property cannot be specified together.");
+        }
+        if (options.has(consumerConfigOpt) && options.has(commandConfigOpt)) {
+            CommandLineUtils.printUsageAndExit(parser, "Options 
--consumer-config and --command-config cannot be specified together.");
+        }
+
+        if (options.has(consumerPropertyOpt)) {
+            System.out.println("Option --consumer-property is deprecated and 
will be removed in a future version. Use --command-property instead.");
+            commandPropertyOpt = consumerPropertyOpt;
+        }
+        if (options.has(consumerConfigOpt)) {
+            System.out.println("Option --consumer-config is deprecated and 
will be removed in a future version. Use --command-config instead.");
+            commandConfigOpt = consumerConfigOpt;
+        }
+
+        Properties consumerPropsFromFile = options.has(commandConfigOpt)
+                ? Utils.loadProps(options.valueOf(commandConfigOpt))
                 : new Properties();
-        Properties extraConsumerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt));
+        Properties extraConsumerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(commandPropertyOpt));
 
         Set<String> groupIdsProvided = checkShareGroup(consumerPropsFromFile, 
extraConsumerProps);
         consumerProps = buildConsumerProps(consumerPropsFromFile, 
extraConsumerProps, groupIdsProvided);
@@ -203,6 +256,13 @@ public final class ConsoleShareConsumerOptions extends 
CommandDefaultOptions {
             Class<?> messageFormatterClass = 
Class.forName(options.valueOf(messageFormatterOpt));
             formatter = (MessageFormatter) 
messageFormatterClass.getDeclaredConstructor().newInstance();
 
+            if (options.has(messageFormatterArgOpt) && 
options.has(messageFormatterArgOptDeprecated)) {
+                CommandLineUtils.printUsageAndExit(parser, "Options --property 
and --formatter-property cannot be specified together.");
+            }
+            if (options.has(messageFormatterArgOptDeprecated)) {
+                System.out.println("Option --property is deprecated and will 
be removed in a future version. Use --formatter-property instead.");
+                messageFormatterArgOpt = messageFormatterArgOptDeprecated;
+            }
             Properties formatterArgs = formatterArgs();
             Map<String, String> formatterConfigs = new HashMap<>();
             for (final String name : formatterArgs.stringPropertyNames()) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java
index 6752aef29c7..e1a519ac6de 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java
@@ -34,6 +34,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -57,11 +58,16 @@ public class ConsoleProducerTest {
         "--bootstrap-server", "localhost:1002",
         "--topic", "t3",
     };
-    private static final String[] CLIENT_ID_OVERRIDE = new String[]{
+    private static final String[] CLIENT_ID_OVERRIDE_DEPRECATED = new String[]{
         "--bootstrap-server", "localhost:1001",
         "--topic", "t3",
         "--producer-property", "client.id=producer-1"
     };
+    private static final String[] CLIENT_ID_OVERRIDE = new String[]{
+        "--bootstrap-server", "localhost:1001",
+        "--topic", "t3",
+        "--command-property", "client.id=producer-1"
+    };
     private static final String[] 
BATCH_SIZE_OVERRIDDEN_BY_MAX_PARTITION_MEMORY_BYTES_VALUE = new String[]{
         "--bootstrap-server", "localhost:1002",
         "--topic", "t3",
@@ -151,8 +157,8 @@ public class ConsoleProducerTest {
     }
 
     @Test
-    public void testClientIdOverride() throws IOException {
-        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(CLIENT_ID_OVERRIDE);
+    public void testClientIdOverrideDeprecated() throws IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(CLIENT_ID_OVERRIDE_DEPRECATED);
         ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
 
         assertEquals("producer-1", 
producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG));
@@ -222,6 +228,107 @@ public class ConsoleProducerTest {
         assertEquals(1, reader.closeCount());
     }
 
+    @Test
+    public void shouldExitOnBothProducerPropertyAndCommandProperty() {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--producer-property", "acks=all",
+            "--command-property", "batch.size=16384"
+        };
+
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleProducerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void shouldExitOnBothProducerConfigAndCommandConfig() throws 
IOException {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        Map<String, String> configs = new HashMap<>();
+        configs.put("acks", "all");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+
+        Map<String, String> configs2 = new HashMap<>();
+        configs2.put("batch.size", "16384");
+        File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--producer.config", propsFile.getAbsolutePath(),
+            "--command-config", propsFile2.getAbsolutePath()
+        };
+
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleProducerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void testClientIdOverrideUsingCommandProperty() throws IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(CLIENT_ID_OVERRIDE);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        assertEquals("producer-1", 
producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG));
+    }
+
+    @Test
+    public void testProducerConfigFromFileUsingCommandConfig() throws 
IOException {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("acks", "all");
+        configs.put("batch.size", "32768");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        // "all" gets converted to "-1" internally by ProducerConfig
+        assertEquals("-1", 
producerConfig.getString(ProducerConfig.ACKS_CONFIG));
+        assertEquals(32768, 
producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
+    }
+
+    @Test
+    public void testCommandPropertyOverridesConfig() throws IOException {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("acks", "1");
+        configs.put("batch.size", "16384");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-config", propsFile.getAbsolutePath(),
+            "--command-property", "acks=all"
+        };
+
+        ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        // Command property should override the config file value
+        // "all" gets converted to "-1" internally by ProducerConfig
+        assertEquals("-1", 
producerConfig.getString(ProducerConfig.ACKS_CONFIG));
+        // Config file value should still be present
+        assertEquals(16384, 
producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
+    }
+
     public static class TestRecordReader implements RecordReader {
         private int configureCount = 0;
         private int closeCount = 0;
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
index 02ded3d5ca5..4639ff63a85 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java
@@ -172,7 +172,7 @@ public class ConsoleConsumerOptionsTest {
     }
 
     @Test
-    public void shouldParseValidConsumerConfigWithAutoOffsetResetLatest() 
throws IOException {
+    public void 
shouldParseValidConsumerConfigWithAutoOffsetResetLatestDeprecated() throws 
IOException {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
@@ -189,7 +189,7 @@ public class ConsoleConsumerOptionsTest {
     }
 
     @Test
-    public void shouldParseValidConsumerConfigWithAutoOffsetResetEarliest() 
throws IOException {
+    public void 
shouldParseValidConsumerConfigWithAutoOffsetResetEarliestDeprecated() throws 
IOException {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
@@ -206,7 +206,7 @@ public class ConsoleConsumerOptionsTest {
     }
 
     @Test
-    public void 
shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning() 
throws IOException {
+    public void 
shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginningDeprecated()
 throws IOException {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
@@ -240,7 +240,7 @@ public class ConsoleConsumerOptionsTest {
     }
 
     @Test
-    public void 
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning() {
+    public void 
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningDeprecated()
 {
         Exit.setExitProcedure((code, message) -> {
             throw new IllegalArgumentException(message);
         });
@@ -259,7 +259,7 @@ public class ConsoleConsumerOptionsTest {
     }
 
     @Test
-    public void shouldParseConfigsFromFile() throws IOException {
+    public void shouldParseConfigsFromFileDeprecated() throws IOException {
         Map<String, String> configs = new HashMap<>();
         configs.put("request.timeout.ms", "1000");
         configs.put("group.id", "group1");
@@ -276,80 +276,82 @@ public class ConsoleConsumerOptionsTest {
     }
 
     @Test
-    public void groupIdsProvidedInDifferentPlacesMustMatch() throws 
IOException {
+    public void groupIdsProvidedInDifferentPlacesMustMatchDeprecated() throws 
IOException {
         Exit.setExitProcedure((code, message) -> {
             throw new IllegalArgumentException(message);
         });
+        try {
 
-        // different in all three places
-        File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
-        final String[] args = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--group", "group-from-arguments",
-            "--consumer-property", "group.id=group-from-properties",
-            "--consumer.config", propsFile.getAbsolutePath()
-        };
-
-        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args));
+            // different in all three places
+            File propsFile = 
ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
+            final String[] args = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--consumer-property", "group.id=group-from-properties",
+                "--consumer.config", propsFile.getAbsolutePath()
+            };
 
-        // the same in all three places
-        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"test-group"));
-        final String[] args1 = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--group", "test-group",
-            "--consumer-property", "group.id=test-group",
-            "--consumer.config", propsFile.getAbsolutePath()
-        };
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args));
 
-        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1);
-        Properties props = config.consumerProps();
-        assertEquals("test-group", props.getProperty("group.id"));
+            // the same in all three places
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"test-group"));
+            final String[] args1 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "test-group",
+                "--consumer-property", "group.id=test-group",
+                "--consumer.config", propsFile.getAbsolutePath()
+            };
 
-        // different via --consumer-property and --consumer.config
-        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
-        final String[] args2 = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--consumer-property", "group.id=group-from-properties",
-            "--consumer.config", propsFile.getAbsolutePath()
-        };
+            ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1);
+            Properties props = config.consumerProps();
+            assertEquals("test-group", props.getProperty("group.id"));
 
-        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args2));
+            // different via --consumer-property and --consumer.config
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+            final String[] args2 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--consumer-property", "group.id=group-from-properties",
+                "--consumer.config", propsFile.getAbsolutePath()
+            };
 
-        // different via --consumer-property and --group
-        final String[] args3 = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--group", "group-from-arguments",
-            "--consumer-property", "group.id=group-from-properties"
-        };
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args2));
 
-        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args3));
+            // different via --consumer-property and --group
+            final String[] args3 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--consumer-property", "group.id=group-from-properties"
+            };
 
-        // different via --group and --consumer.config
-        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
-        final String[] args4 = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--group", "group-from-arguments",
-            "--consumer.config", propsFile.getAbsolutePath()
-        };
-        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args4));
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args3));
 
-        // via --group only
-        final String[] args5 = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--group", "group-from-arguments"
-        };
+            // different via --group and --consumer.config
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+            final String[] args4 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--consumer.config", propsFile.getAbsolutePath()
+            };
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args4));
 
-        config = new ConsoleConsumerOptions(args5);
-        props = config.consumerProps();
-        assertEquals("group-from-arguments", props.getProperty("group.id"));
+            // via --group only
+            final String[] args5 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments"
+            };
 
-        Exit.resetExitProcedure();
+            config = new ConsoleConsumerOptions(args5);
+            props = config.consumerProps();
+            assertEquals("group-from-arguments", 
props.getProperty("group.id"));
+        } finally {
+            Exit.resetExitProcedure();
+        }
     }
 
     @Test
@@ -508,7 +510,7 @@ public class ConsoleConsumerOptionsTest {
     }
 
     @Test
-    public void testClientIdOverride() throws IOException {
+    public void testClientIdOverrideDeprecated() throws IOException {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
@@ -618,4 +620,234 @@ public class ConsoleConsumerOptionsTest {
             "--formatter", formatter,
         };
     }
+
+    @Test
+    public void shouldExitOnBothConsumerPropertyAndCommandProperty() {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--consumer-property", "auto.offset.reset=latest",
+            "--command-property", "session.timeout.ms=10000"
+        };
+
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void shouldExitOnBothConsumerConfigAndCommandConfig() throws 
IOException {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        Map<String, String> configs = new HashMap<>();
+        configs.put("request.timeout.ms", "1000");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+
+        Map<String, String> configs2 = new HashMap<>();
+        configs2.put("session.timeout.ms", "10000");
+        File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--consumer.config", propsFile.getAbsolutePath(),
+            "--command-config", propsFile2.getAbsolutePath()
+        };
+
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void 
shouldParseValidConsumerConfigWithAutoOffsetResetLatestUsingCommandProperty() 
throws IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "auto.offset.reset=latest"
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+        Properties consumerProperties = config.consumerProps();
+
+        assertEquals("localhost:9092", config.bootstrapServer());
+        assertEquals("test", config.topicArg().orElse(""));
+        assertFalse(config.fromBeginning());
+        assertEquals("latest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+    }
+
+    @Test
+    public void 
shouldParseValidConsumerConfigWithAutoOffsetResetEarliestUsingCommandProperty() 
throws IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "auto.offset.reset=earliest"
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+        Properties consumerProperties = config.consumerProps();
+
+        assertEquals("localhost:9092", config.bootstrapServer());
+        assertEquals("test", config.topicArg().orElse(""));
+        assertFalse(config.fromBeginning());
+        assertEquals("earliest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+    }
+
+    @Test
+    public void 
shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginningUsingCommandProperty()
 throws IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "auto.offset.reset=earliest",
+            "--from-beginning"
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+        Properties consumerProperties = config.consumerProps();
+
+        assertEquals("localhost:9092", config.bootstrapServer());
+        assertEquals("test", config.topicArg().orElse(""));
+        assertTrue(config.fromBeginning());
+        assertEquals("earliest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+    }
+
+    @Test
+    public void 
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningUsingCommandProperty()
 {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "auto.offset.reset=latest",
+            "--from-beginning"
+        };
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void shouldParseConfigsFromFileUsingCommandConfig() throws 
IOException {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("request.timeout.ms", "1000");
+        configs.put("group.id", "group1");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+        assertEquals("1000", config.consumerProps().get("request.timeout.ms"));
+        assertEquals("group1", config.consumerProps().get("group.id"));
+    }
+
+    @Test
+    public void groupIdsProvidedInDifferentPlacesMustMatchUsingCommandConfig() 
throws IOException {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        try {
+            // different in all three places
+            File propsFile = 
ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
+            final String[] args = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--command-property", "group.id=group-from-properties",
+                "--command-config", propsFile.getAbsolutePath()
+            };
+
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args));
+
+            // the same in all three places
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"test-group"));
+            final String[] args1 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "test-group",
+                "--command-property", "group.id=test-group",
+                "--command-config", propsFile.getAbsolutePath()
+            };
+
+            ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1);
+            Properties props = config.consumerProps();
+            assertEquals("test-group", props.getProperty("group.id"));
+
+            // different via --command-property and --command-config
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+            final String[] args2 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--command-property", "group.id=group-from-properties",
+                "--command-config", propsFile.getAbsolutePath()
+            };
+
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args2));
+
+            // different via --command-property and --group
+            final String[] args3 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--command-property", "group.id=group-from-properties"
+            };
+
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args3));
+
+            // different via --group and --command-config
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+            final String[] args4 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--command-config", propsFile.getAbsolutePath()
+            };
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args4));
+
+            // via --group only
+            final String[] args5 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments"
+            };
+
+            config = new ConsoleConsumerOptions(args5);
+            props = config.consumerProps();
+            assertEquals("group-from-arguments", 
props.getProperty("group.id"));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void testClientIdOverrideUsingCommandProperty() throws IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--from-beginning",
+            "--command-property", "client.id=consumer-1"
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+        Properties consumerProperties = config.consumerProps();
+
+        assertEquals("consumer-1", 
consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+    }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java
index c2b7b73c1a5..fecf53dbbec 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.MockDeserializer;
 import org.apache.kafka.tools.ToolsTestUtils;
 
 import org.junit.jupiter.api.Test;
@@ -32,7 +33,9 @@ import java.util.Properties;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ConsoleShareConsumerOptionsTest {
 
@@ -72,7 +75,7 @@ public class ConsoleShareConsumerOptionsTest {
     }
 
     @Test
-    public void shouldParseValidConsumerConfigWithSessionTimeout() throws 
IOException {
+    public void shouldParseValidConsumerConfigWithSessionTimeoutDeprecated() 
throws IOException {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
@@ -88,7 +91,7 @@ public class ConsoleShareConsumerOptionsTest {
     }
 
     @Test
-    public void shouldParseConfigsFromFile() throws IOException {
+    public void shouldParseConfigsFromFileDeprecated() throws IOException {
         Map<String, String> configs = new HashMap<>();
         configs.put("request.timeout.ms", "1000");
         configs.put("group.id", "group1");
@@ -109,80 +112,82 @@ public class ConsoleShareConsumerOptionsTest {
     }
 
     @Test
-    public void groupIdsProvidedInDifferentPlacesMustMatch() throws 
IOException {
+    public void groupIdsProvidedInDifferentPlacesMustMatchDeprecated() throws 
IOException {
         Exit.setExitProcedure((code, message) -> {
             throw new IllegalArgumentException(message);
         });
 
-        // different in all three places
-        File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
-        final String[] args = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--group", "group-from-arguments",
-            "--consumer-property", "group.id=group-from-properties",
-            "--consumer-config", propsFile.getAbsolutePath()
-        };
-
-        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args));
-
-        // the same in all three places
-        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"test-group"));
-        final String[] args1 = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--group", "test-group",
-            "--consumer-property", "group.id=test-group",
-            "--consumer-config", propsFile.getAbsolutePath()
-        };
-
-        ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args1);
-        Properties props = config.consumerProps();
-        assertEquals("test-group", props.getProperty("group.id"));
-
-        // different via --consumer-property and --consumer-config
-        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
-        final String[] args2 = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--consumer-property", "group.id=group-from-properties",
-            "--consumer-config", propsFile.getAbsolutePath()
-        };
-
-        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args2));
-
-        // different via --consumer-property and --group
-        final String[] args3 = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--group", "group-from-arguments",
-            "--consumer-property", "group.id=group-from-properties"
-        };
-
-        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args3));
-
-        // different via --group and --consumer-config
-        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
-        final String[] args4 = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--group", "group-from-arguments",
-            "--consumer-config", propsFile.getAbsolutePath()
-        };
-        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args4));
-
-        // via --group only
-        final String[] args5 = new String[]{
-            "--bootstrap-server", "localhost:9092",
-            "--topic", "test",
-            "--group", "group-from-arguments"
-        };
+        try {
+            // different in all three places
+            File propsFile = 
ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
+            final String[] args = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--consumer-property", "group.id=group-from-properties",
+                "--consumer-config", propsFile.getAbsolutePath()
+            };
 
-        config = new ConsoleShareConsumerOptions(args5);
-        props = config.consumerProps();
-        assertEquals("group-from-arguments", props.getProperty("group.id"));
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args));
 
-        Exit.resetExitProcedure();
+            // the same in all three places
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"test-group"));
+            final String[] args1 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "test-group",
+                "--consumer-property", "group.id=test-group",
+                "--consumer-config", propsFile.getAbsolutePath()
+            };
+
+            ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args1);
+            Properties props = config.consumerProps();
+            assertEquals("test-group", props.getProperty("group.id"));
+
+            // different via --consumer-property and --consumer-config
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+            final String[] args2 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--consumer-property", "group.id=group-from-properties",
+                "--consumer-config", propsFile.getAbsolutePath()
+            };
+
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args2));
+
+            // different via --consumer-property and --group
+            final String[] args3 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--consumer-property", "group.id=group-from-properties"
+            };
+
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args3));
+
+            // different via --group and --consumer-config
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+            final String[] args4 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--consumer-config", propsFile.getAbsolutePath()
+            };
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args4));
+
+            // via --group only
+            final String[] args5 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments"
+            };
+
+            config = new ConsoleShareConsumerOptions(args5);
+            props = config.consumerProps();
+            assertEquals("group-from-arguments", 
props.getProperty("group.id"));
+        } finally {
+            Exit.resetExitProcedure();
+        }
     }
 
     @Test
@@ -203,7 +208,7 @@ public class ConsoleShareConsumerOptionsTest {
     }
 
     @Test
-    public void testClientIdOverride() throws IOException {
+    public void testClientIdOverrideDeprecated() throws IOException {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
@@ -216,6 +221,56 @@ public class ConsoleShareConsumerOptionsTest {
         assertEquals("consumer-1", 
consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
     }
 
+    @Test
+    public void testCustomPropertyShouldBePassedToConfigureMethod() throws 
Exception {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--property", "print.key=true",
+            "--property", 
"key.deserializer=org.apache.kafka.test.MockDeserializer",
+            "--property", "key.deserializer.my-props=abc"
+        };
+
+        ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args);
+
+        assertInstanceOf(DefaultMessageFormatter.class, config.formatter());
+        
assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props"));
+        DefaultMessageFormatter formatter = (DefaultMessageFormatter) 
config.formatter();
+        assertTrue(formatter.keyDeserializer().isPresent());
+        assertInstanceOf(MockDeserializer.class, 
formatter.keyDeserializer().get());
+        MockDeserializer keyDeserializer = (MockDeserializer) 
formatter.keyDeserializer().get();
+        assertEquals(1, keyDeserializer.configs.size());
+        assertEquals("abc", keyDeserializer.configs.get("my-props"));
+        assertTrue(keyDeserializer.isKey);
+    }
+
+    @Test
+    public void testCustomConfigShouldBePassedToConfigureMethod() throws 
Exception {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("key.deserializer.my-props", "abc");
+        configs.put("print.key", "false");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--property", "print.key=true",
+            "--property", 
"key.deserializer=org.apache.kafka.test.MockDeserializer",
+            "--formatter-config", propsFile.getAbsolutePath()
+        };
+
+        ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args);
+
+        assertInstanceOf(DefaultMessageFormatter.class, config.formatter());
+        
assertTrue(config.formatterArgs().containsKey("key.deserializer.my-props"));
+        DefaultMessageFormatter formatter = (DefaultMessageFormatter) 
config.formatter();
+        assertTrue(formatter.keyDeserializer().isPresent());
+        assertInstanceOf(MockDeserializer.class, 
formatter.keyDeserializer().get());
+        MockDeserializer keyDeserializer = (MockDeserializer) 
formatter.keyDeserializer().get();
+        assertEquals(1, keyDeserializer.configs.size());
+        assertEquals("abc", keyDeserializer.configs.get("my-props"));
+        assertTrue(keyDeserializer.isKey);
+    }
+
     @Test
     public void testDefaultClientId() throws IOException {
         String[] args = new String[]{
@@ -271,4 +326,182 @@ public class ConsoleShareConsumerOptionsTest {
             Exit.resetExitProcedure();
         }
     }
+
+    @Test
+    public void shouldExitOnBothConsumerPropertyAndCommandProperty() {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--consumer-property", "session.timeout.ms=10000",
+            "--command-property", "request.timeout.ms=30000"
+        };
+
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void shouldExitOnBothConsumerConfigAndCommandConfig() throws 
IOException {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        Map<String, String> configs = new HashMap<>();
+        configs.put("request.timeout.ms", "1000");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+
+        Map<String, String> configs2 = new HashMap<>();
+        configs2.put("session.timeout.ms", "10000");
+        File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--consumer-config", propsFile.getAbsolutePath(),
+            "--command-config", propsFile2.getAbsolutePath()
+        };
+
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void shouldParseValidConsumerConfigWithSessionTimeout() throws 
IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "session.timeout.ms=10000"
+        };
+
+        ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args);
+        Properties consumerProperties = config.consumerProps();
+
+        assertEquals("localhost:9092", config.bootstrapServer());
+        assertEquals("test", config.topicArg());
+        assertEquals("10000", 
consumerProperties.getProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG));
+    }
+
+    @Test
+    public void shouldParseConfigsFromFile() throws IOException {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("request.timeout.ms", "1000");
+        configs.put("group.id", "group1");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args);
+
+        // KafkaShareConsumer uses Utils.propsToMap to convert the properties 
to a map,
+        // so using the same method to check the map has the expected values
+        Map<String, Object> configMap = 
Utils.propsToMap(config.consumerProps());
+        assertEquals("1000", configMap.get("request.timeout.ms"));
+        assertEquals("group1", configMap.get("group.id"));
+    }
+
+    @Test
+    public void groupIdsProvidedInDifferentPlacesMustMatch() throws 
IOException {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        try {
+            // different in all three places
+            File propsFile = 
ToolsTestUtils.tempPropertiesFile(Map.of("group.id", "group-from-file"));
+            final String[] args = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--command-property", "group.id=group-from-properties",
+                "--command-config", propsFile.getAbsolutePath()
+            };
+
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args));
+
+            // the same in all three places
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"test-group"));
+            final String[] args1 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "test-group",
+                "--command-property", "group.id=test-group",
+                "--command-config", propsFile.getAbsolutePath()
+            };
+
+            ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args1);
+            Properties props = config.consumerProps();
+            assertEquals("test-group", props.getProperty("group.id"));
+
+            // different via --command-property and --command-config
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+            final String[] args2 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--command-property", "group.id=group-from-properties",
+                "--command-config", propsFile.getAbsolutePath()
+            };
+
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args2));
+
+            // different via --command-property and --group
+            final String[] args3 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--command-property", "group.id=group-from-properties"
+            };
+
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args3));
+
+            // different via --group and --command-config
+            propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+            final String[] args4 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments",
+                "--command-config", propsFile.getAbsolutePath()
+            };
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args4));
+
+            // via --group only
+            final String[] args5 = new String[]{
+                "--bootstrap-server", "localhost:9092",
+                "--topic", "test",
+                "--group", "group-from-arguments"
+            };
+
+            config = new ConsoleShareConsumerOptions(args5);
+            props = config.consumerProps();
+            assertEquals("group-from-arguments", 
props.getProperty("group.id"));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void testClientIdOverride() throws IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "client.id=consumer-1"
+        };
+
+        ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args);
+        Properties consumerProperties = config.consumerProps();
+
+        assertEquals("consumer-1", 
consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+    }
 }


Reply via email to