mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1474695808


##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ConsoleConsumerOptions extends CommandDefaultOptions {
+
+    private static final Random RANDOM = new Random();
+
+    private final OptionSpec<String> topicOpt;
+    private final OptionSpec<String> whitelistOpt;
+    private final OptionSpec<String> includeOpt;
+    private final OptionSpec<Integer> partitionIdOpt;
+    private final OptionSpec<String> offsetOpt;
+    private final OptionSpec<String> messageFormatterOpt;
+    private final OptionSpec<String> messageFormatterArgOpt;
+    private final OptionSpec<String> messageFormatterConfigOpt;
+    private final OptionSpec<?> resetBeginningOpt;
+    private final OptionSpec<Integer> maxMessagesOpt;
+    private final OptionSpec<Integer> timeoutMsOpt;
+    private final OptionSpec<?> skipMessageOnErrorOpt;
+    private final OptionSpec<String> bootstrapServerOpt;
+    private final OptionSpec<String> keyDeserializerOpt;
+    private final OptionSpec<String> valueDeserializerOpt;
+    private final OptionSpec<?> enableSystestEventsLoggingOpt;
+    private final OptionSpec<String> isolationLevelOpt;
+    private final OptionSpec<String> groupIdOpt;
+
+    private final Properties consumerProps;
+    private final long offset;
+    private final MessageFormatter formatter;
+
+    public ConsoleConsumerOptions(String[] args) throws IOException {
+        super(args);
+        topicOpt = parser.accepts("topic", "The topic to consume on.")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+        whitelistOpt = parser.accepts("whitelist",
+                        "DEPRECATED, use --include instead; ignored if 
--include specified. Regular expression specifying list of topics to include 
for consumption.")
+                .withRequiredArg()
+                .describedAs("Java regex (String)")
+                .ofType(String.class);
+        includeOpt = parser.accepts("include",
+                        "Regular expression specifying list of topics to 
include for consumption.")
+                .withRequiredArg()
+                .describedAs("Java regex (String)")
+                .ofType(String.class);
+        partitionIdOpt = parser.accepts("partition",
+                        "The partition to consume from. Consumption starts 
from the end of the partition unless '--offset' is specified.")
+                .withRequiredArg()
+                .describedAs("partition")
+                .ofType(Integer.class);
+        offsetOpt = parser.accepts("offset", "The offset to consume from (a 
non-negative number), or 'earliest' which means from beginning, or 'latest' 
which means from end")
+                .withRequiredArg()
+                .describedAs("consume offset")
+                .ofType(String.class)
+                .defaultsTo("latest");
+        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "A mechanism to pass user-defined 
properties in the form key=value to the consumer.")
+                .withRequiredArg()
+                .describedAs("consumer_prop")
+                .ofType(String.class);
+        OptionSpec<String> consumerConfigOpt = 
parser.accepts("consumer.config", "Consumer config properties file. Note that " 
+ consumerPropertyOpt + " takes precedence over this config.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+        messageFormatterOpt = parser.accepts("formatter", "The name of a class 
to use for formatting kafka messages for display.")
+                .withRequiredArg()
+                .describedAs("class")
+                .ofType(String.class)
+                .defaultsTo(DefaultMessageFormatter.class.getName());
+        messageFormatterArgOpt = parser.accepts("property",
+                        "The properties to initialize the message formatter. 
Default properties include: \n" +
+                            " print.timestamp=true|false\n" +
+                            " print.key=true|false\n" +
+                            " print.offset=true|false\n" +
+                            " print.partition=true|false\n" +
+                            " print.headers=true|false\n" +
+                            " print.value=true|false\n" +
+                            " key.separator=<key.separator>\n" +
+                            " line.separator=<line.separator>\n" +
+                            " headers.separator=<line.separator>\n" +
+                            " null.literal=<null.literal>\n" +
+                            " key.deserializer=<key.deserializer>\n" +
+                            " value.deserializer=<value.deserializer>\n" +
+                            " header.deserializer=<header.deserializer>\n" +
+                            "\nUsers can also pass in customized properties 
for their formatter; more specifically, users can pass in properties keyed with 
'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes 
to configure their deserializers.")
+                .withRequiredArg()
+                .describedAs("prop")
+                .ofType(String.class);
+        messageFormatterConfigOpt = parser.accepts("formatter-config", "Config 
properties file to initialize the message formatter. Note that " + 
messageFormatterArgOpt + " takes precedence over this config.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+        resetBeginningOpt = parser.accepts("from-beginning", "If the consumer 
does not already have an established offset to consume from, " +
+                "start with the earliest message present in the log rather 
than the latest message.");
+        maxMessagesOpt = parser.accepts("max-messages", "The maximum number of 
messages to consume before exiting. If not set, consumption is continual.")
+                .withRequiredArg()
+                .describedAs("num_messages")
+                .ofType(Integer.class);
+        timeoutMsOpt = parser.accepts("timeout-ms", "If specified, exit if no 
message is available for consumption for the specified interval.")
+                .withRequiredArg()
+                .describedAs("timeout_ms")
+                .ofType(Integer.class);
+        skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If 
there is an error when processing a message, " +
+                "skip it instead of halt.");
+        bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The 
server(s) to connect to.")
+                .withRequiredArg()
+                .describedAs("server to connect to")
+                .ofType(String.class);
+        keyDeserializerOpt = parser.accepts("key-deserializer")
+                .withRequiredArg()
+                .describedAs("deserializer for key")
+                .ofType(String.class);
+        valueDeserializerOpt = parser.accepts("value-deserializer")
+                .withRequiredArg()
+                .describedAs("deserializer for values")
+                .ofType(String.class);
+        enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
+                "Log lifecycle events of the consumer in addition to logging 
consumed messages. (This is specific for system tests.)");
+        isolationLevelOpt = parser.accepts("isolation-level",
+                        "Set to read_committed in order to filter out 
transactional messages which are not committed. Set to read_uncommitted " +
+                                "to read all messages.")
+                .withRequiredArg()
+                .ofType(String.class)
+                .defaultsTo("read_uncommitted");
+        groupIdOpt = parser.accepts("group", "The consumer group id of the 
consumer.")
+                .withRequiredArg()
+                .describedAs("consumer group id")
+                .ofType(String.class);
+
+        try {
+            options = parser.parse(args);
+        } catch (OptionException oe) {
+            CommandLineUtils.printUsageAndExit(parser, oe.getMessage());
+        }
+
+        CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
read data from Kafka topics and outputs it to standard output.");
+
+        checkRequiredArgs();
+
+        Properties consumerPropsFromFile = options.has(consumerConfigOpt)
+                ? Utils.loadProps(options.valueOf(consumerConfigOpt))
+                : new Properties();
+        Properties extraConsumerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt));
+
+        Set<String> groupIdsProvided = 
checkConsumerGroup(consumerPropsFromFile, extraConsumerProps);
+        consumerProps = buildConsumerProps(consumerPropsFromFile, 
extraConsumerProps, groupIdsProvided);
+        offset = parseOffset();
+        formatter = buildFormatter();
+    }
+
+    private void checkRequiredArgs() {
+        List<String> topicOrFilterArgs = new 
ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
+        topicOrFilterArgs.removeIf(Objects::isNull);
+        // user need to specify value for either --topic or one of the include 
filters options (--include or --whitelist)
+        if (topicOrFilterArgs.size() != 1) {
+            CommandLineUtils.printUsageAndExit(parser, "Exactly one of 
--include/--topic is required. " +
+                    (options.has(whitelistOpt) ? "--whitelist is DEPRECATED 
use --include instead; ignored if --include specified." : ""));
+        }
+
+        if (partitionArg().isPresent()) {
+            if (!options.has(topicOpt)) {
+                CommandLineUtils.printUsageAndExit(parser, "The topic is 
required when partition is specified.");
+            }
+            if (fromBeginning() && options.has(offsetOpt)) {
+                CommandLineUtils.printUsageAndExit(parser, "Options 
from-beginning and offset cannot be specified together.");
+            }
+        } else if (options.has(offsetOpt)) {
+            CommandLineUtils.printUsageAndExit(parser, "The partition is 
required when offset is specified.");
+        }
+
+        CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
+    }
+
+    private Set<String> checkConsumerGroup(Properties consumerPropsFromFile, 
Properties extraConsumerProps) {
+        // if the group id is provided in more than place (through different 
means) all values must be the same
+        Set<String> groupIdsProvided = new HashSet<>();
+        if (options.has(groupIdOpt)) {
+            groupIdsProvided.add(options.valueOf(groupIdOpt));
+        }
+
+        if (consumerPropsFromFile.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) 
{
+            groupIdsProvided.add((String) 
consumerPropsFromFile.get(ConsumerConfig.GROUP_ID_CONFIG));
+        }
+
+        if (extraConsumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+            
groupIdsProvided.add(extraConsumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+        }
+        if (groupIdsProvided.size() > 1) {
+            CommandLineUtils.printUsageAndExit(parser, "The group ids provided 
in different places (directly using '--group', "
+                    + "via '--consumer-property', or via '--consumer.config') 
do not match. "
+                    + "Detected group ids: "
+                    + groupIdsProvided.stream().map(group -> "'" + group + 
"'").collect(Collectors.joining(", ")));
+        }
+        if (!groupIdsProvided.isEmpty() && partitionArg().isPresent()) {
+            CommandLineUtils.printUsageAndExit(parser, "Options group and 
partition cannot be specified together.");
+        }
+        return groupIdsProvided;
+    }
+
+    private Properties buildConsumerProps(Properties consumerPropsFromFile, 
Properties extraConsumerProps, Set<String> groupIdsProvided) {
+        Properties consumerProps = new Properties(consumerPropsFromFile);
+        consumerProps.putAll(extraConsumerProps);
+        setAutoOffsetResetValue(consumerProps);
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer());
+        if (consumerProps.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == 
null) {
+            consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, 
"console-consumer");
+        }
+        CommandLineUtils.maybeMergeOptions(consumerProps, 
ConsumerConfig.ISOLATION_LEVEL_CONFIG, options, isolationLevelOpt);
+
+        if (groupIdsProvided.isEmpty()) {
+            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
"console-consumer-" + RANDOM.nextInt(100000));
+            // By default, avoid unnecessary expansion of the coordinator 
cache since
+            // the auto-generated group and its offsets is not intended to be 
used again
+            if 
(!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+                consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
+            }
+        } else {
+            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
groupIdsProvided.iterator().next());
+        }
+        return consumerProps;
+    }
+
+    /**
+     * Used to retrieve the correct value for the consumer parameter 
'auto.offset.reset'.
+     * Order of priority is:
+     *   1. Explicitly set parameter via --consumer.property command line 
parameter
+     *   2. Explicit --from-beginning given -> 'earliest'
+     *   3. Default value of 'latest'
+     * In case both --from-beginning and an explicit value are specified an 
error is thrown if these
+     * are conflicting.
+     */
+    private void setAutoOffsetResetValue(Properties props) {
+        String earliestConfigValue = "earliest";
+        String latestConfigValue = "latest";
+
+        if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
+            // auto.offset.reset parameter was specified on the command line
+            String autoResetOption = 
props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+            if (fromBeginning() && 
!earliestConfigValue.equals(autoResetOption)) {
+                // conflicting options - latest und earliest, throw an error
+                System.err.println("Can't simultaneously specify 
--from-beginning and 'auto.offset.reset=" + autoResetOption + "', " +
+                        "please remove one option");
+                Exit.exit(1);
+            }
+            // nothing to do, checking for valid parameter values happens 
later and the specified
+            // value was already copied during .putall operation
+        } else {
+            // no explicit value for auto.offset.reset was specified
+            // if --from-beginning was specified use earliest, otherwise 
default to latest
+            String autoResetOption = fromBeginning() ? earliestConfigValue : 
latestConfigValue;
+            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
autoResetOption);
+        }
+    }
+
+    private long parseOffset() {
+        if (options.has(offsetOpt)) {
+            switch (options.valueOf(offsetOpt).toLowerCase(Locale.ROOT)) {
+                case "earliest":
+                    return ListOffsetsRequest.EARLIEST_TIMESTAMP;
+                case "latest":
+                    return ListOffsetsRequest.LATEST_TIMESTAMP;
+                default:
+                    String offsetString = options.valueOf(offsetOpt);
+                    try {
+                        long offset = Long.parseLong(offsetString);
+                        if (offset < 0) {
+                            invalidOffset(offsetString);
+                        }
+                        return offset;
+                    } catch (NumberFormatException nfe) {
+                        invalidOffset(offsetString);
+                    }
+            }
+        } else if (fromBeginning()) {
+            return ListOffsetsRequest.EARLIEST_TIMESTAMP;
+        }
+        return ListOffsetsRequest.LATEST_TIMESTAMP;
+    }
+
+    private void invalidOffset(String offset) {
+        CommandLineUtils.printUsageAndExit(parser, "The provided offset value 
'" + offset + "' is incorrect. Valid values are " +
+                "'earliest', 'latest', or a non-negative long.");
+    }
+
+    private MessageFormatter buildFormatter() {
+        MessageFormatter formatter = null;

Review Comment:
   Yes it's needed otherwise the compiler complains `formatter` may not have 
been initialized as it does not understand the catch clause causes the tool to 
exit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to