Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1451#discussion_r65576345
  
    --- Diff: 
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 ---
    @@ -0,0 +1,374 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.monitor;
    +
    +import kafka.api.OffsetRequest;
    +import kafka.api.PartitionOffsetRequestInfo;
    +import kafka.common.TopicAndPartition;
    +import kafka.javaapi.OffsetResponse;
    +import kafka.javaapi.consumer.SimpleConsumer;
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.DefaultParser;
    +import org.apache.commons.cli.HelpFormatter;
    +import org.apache.commons.cli.Options;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.RetryOneTime;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +import org.json.simple.JSONValue;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +/**
    + * Utility class for querying offset lag for kafka spout
    + */
    +public class KafkaOffsetLagUtil {
    +    private static final String OPTION_TOPIC_SHORT = "t";
    +    private static final String OPTION_TOPIC_LONG = "topics";
    +    private static final String OPTION_OLD_CONSUMER_SHORT = "o";
    +    private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
    +    private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
    +    private static final String OPTION_BOOTSTRAP_BROKERS_LONG = 
"bootstrap-brokers";
    +    private static final String OPTION_GROUP_ID_SHORT = "g";
    +    private static final String OPTION_GROUP_ID_LONG = "groupid";
    +    private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
    +    private static final String OPTION_TOPIC_WILDCARD_LONG = 
"wildcard-topic";
    +    private static final String OPTION_PARTITIONS_SHORT = "p";
    +    private static final String OPTION_PARTITIONS_LONG = "partitions";
    +    private static final String OPTION_LEADERS_SHORT = "l";
    +    private static final String OPTION_LEADERS_LONG = "leaders";
    +    private static final String OPTION_ZK_SERVERS_SHORT = "z";
    +    private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
    +    private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
    +    private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
    +    private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
    +    private static final String OPTION_ZK_BROKERS_ROOT_LONG = 
"zk-brokers-root-node";
    +
    +    public static void main (String args[]) {
    +        try {
    +            List<KafkaOffsetLagResult> results;
    +            Options options = buildOptions();
    +            CommandLineParser parser = new DefaultParser();
    +            CommandLine commandLine = parser.parse(options, args);
    +            if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
    +                printUsageAndExit(options, OPTION_TOPIC_LONG + " is 
required");
    +            }
    +            if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
    +                OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
    +                if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || 
commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
    +                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or 
" + OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " +
    +                            OPTION_OLD_CONSUMER_LONG);
    +                }
    +                if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || 
!commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
    +                    printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " 
and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required  with " +
    +                            OPTION_OLD_CONSUMER_LONG);
    +                }
    +                String[] topics = 
commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
    +                if (topics != null && topics.length > 1) {
    +                    printUsageAndExit(options, "Multiple topics not 
supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic 
or a " +
    +                            "wildcard string for matching topics is 
supported");
    +                }
    +                if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
    +                    if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || 
commandLine.hasOption(OPTION_LEADERS_LONG)) {
    +                        printUsageAndExit(options, OPTION_PARTITIONS_LONG 
+ " or " + OPTION_LEADERS_LONG + " is not accepted with " +
    +                                OPTION_ZK_BROKERS_ROOT_LONG);
    +                    }
    +                    oldKafkaSpoutOffsetQuery = new 
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), 
commandLine.getOptionValue
    +                            (OPTION_ZK_SERVERS_LONG), 
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
    +                            (OPTION_TOPIC_WILDCARD_LONG), 
commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
    +                } else {
    +                    if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG)) 
{
    +                        printUsageAndExit(options, 
OPTION_TOPIC_WILDCARD_LONG + " is not supported without " + 
OPTION_ZK_BROKERS_ROOT_LONG);
    +                    }
    +                    if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || 
!commandLine.hasOption(OPTION_LEADERS_LONG)) {
    +                        printUsageAndExit(options, OPTION_PARTITIONS_LONG 
+ " and " + OPTION_LEADERS_LONG + " are required if " + 
OPTION_ZK_BROKERS_ROOT_LONG +
    +                                " is not provided");
    +                    }
    +                    String[] partitions = 
commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
    +                    String[] leaders = 
commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
    +                    if (partitions.length != leaders.length) {
    +                        printUsageAndExit(options, OPTION_PARTITIONS_LONG 
+ " and " + OPTION_LEADERS_LONG + " need to be of same size");
    +                    }
    +                    oldKafkaSpoutOffsetQuery = new 
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), 
commandLine.getOptionValue
    +                            (OPTION_ZK_SERVERS_LONG), 
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), 
commandLine.getOptionValue
    +                            (OPTION_PARTITIONS_LONG), 
commandLine.getOptionValue(OPTION_LEADERS_LONG));
    +                }
    +                results = getOffsetLags(oldKafkaSpoutOffsetQuery);
    +            } else {
    +                String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, 
OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
    +                        OPTION_ZK_COMMITTED_NODE_LONG, 
OPTION_ZK_BROKERS_ROOT_LONG};
    +                for (String oldOption: oldSpoutOptions) {
    +                    if (commandLine.hasOption(oldOption)) {
    +                        printUsageAndExit(options, oldOption + " is not 
accepted without " + OPTION_OLD_CONSUMER_LONG);
    +                    }
    +                }
    +                if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || 
!commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
    +                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " 
and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " + 
OPTION_OLD_CONSUMER_LONG +
    +                            " is not specified");
    +                }
    +                NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new 
NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
    +                        
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), 
commandLine.getOptionValue(OPTION_GROUP_ID_LONG));
    +                results = getOffsetLags(newKafkaSpoutOffsetQuery);
    +            }
    +            System.out.print(JSONValue.toJSONString(results));
    +        } catch (Exception ex) {
    +            System.out.print("Unable to get offset lags for kafka. Reason: 
");
    +            ex.printStackTrace(System.out);
    +        }
    +    }
    +
    +    private static void printUsageAndExit (Options options, String 
message) {
    +        System.out.println(message);
    +        HelpFormatter formatter = new HelpFormatter();
    +        formatter.printHelp("storm-kafka-monitor ", options);
    +        System.exit(1);
    +    }
    +
    +    private static Options buildOptions () {
    +        Options options = new Options();
    +        options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, 
"REQUIRED Topics (comma separated list) for fetching log head and spout 
committed " +
    +                "offset");
    +        options.addOption(OPTION_OLD_CONSUMER_SHORT, 
OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
    +        options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, 
OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker 
hosts for new " +
    +                "consumer/spout e.g. hostname1:9092,hostname2:9092");
    +        options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, 
true, "Group id of consumer (applicable only for new kafka spout) ");
    +        options.addOption(OPTION_TOPIC_WILDCARD_SHORT, 
OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as 
supported by ZkHosts in " +
    +                "old spout");
    +        options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, 
true, "Comma separated list of partitions corresponding to " +
    +                OPTION_LEADERS_LONG + " for old spout with StaticHosts");
    +        options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, 
"Comma separated list of broker leaders corresponding to " +
    +                OPTION_PARTITIONS_LONG + " for old spout with StaticHosts 
e.g. hostname1:9092,hostname2:9092");
    +        options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, 
true, "Comma separated list of zk servers for fetching spout committed offsets  
" +
    +                "and/or topic metadata for ZkHosts e.g 
hostname1:2181,hostname2:2181");
    +        options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, 
OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout 
stores the committed" +
    +                " offsets without the topic and partition nodes");
    +        options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, 
OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker 
information e.g. " +
    +                "/brokers (applicable only for old kafka spout) ");
    +        return options;
    +    }
    +
    +    /**
    +     *
    +     * @param newKafkaSpoutOffsetQuery represents the information needed 
to query kafka for log head and spout offsets
    +     * @return log head offset, spout offset and lag for each partition
    +     */
    +    public static List<KafkaOffsetLagResult> getOffsetLags 
(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
    +        KafkaConsumer<String, String> consumer = null;
    +        List<KafkaOffsetLagResult> result = new ArrayList<>();
    +        try {
    +            Properties props = new Properties();
    +            props.put("bootstrap.servers", 
newKafkaSpoutOffsetQuery.getBootStrapBrokers());
    +            props.put("group.id", 
newKafkaSpoutOffsetQuery.getConsumerGroupId());
    +            props.put("enable.auto.commit", "false");
    +            props.put("session.timeout.ms", "30000");
    +            props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    +            props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    +            List<TopicPartition> topicPartitionList = new ArrayList<>();
    +            consumer = new KafkaConsumer<>(props);
    +            for (String topic: 
newKafkaSpoutOffsetQuery.getTopics().split(",")) {
    +                List<PartitionInfo> partitionInfoList = 
consumer.partitionsFor(topic);
    +                if (partitionInfoList != null) {
    +                    for (PartitionInfo partitionInfo : partitionInfoList) {
    +                        topicPartitionList.add(new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
    +                    }
    +                }
    +            }
    +            consumer.assign(topicPartitionList);
    +            for (TopicPartition topicPartition : topicPartitionList) {
    +                OffsetAndMetadata offsetAndMetadata = 
consumer.committed(topicPartition);
    +                long committedOffset = offsetAndMetadata != null ? 
offsetAndMetadata.offset() : -1;
    +                consumer.seekToEnd(topicPartition);
    +                result.add(new 
KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), 
committedOffset, consumer.position(topicPartition)));
    +            }
    +        } finally {
    +            if (consumer != null) {
    +                consumer.close();
    +            }
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     *
    +     * @param oldKafkaSpoutOffsetQuery represents the information needed 
to query kafka for log head and spout offsets
    +     * @return log head offset, spout offset and lag for each partition
    +     */
    +    public static List<KafkaOffsetLagResult> getOffsetLags 
(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
    +        List<KafkaOffsetLagResult> result = new ArrayList<>();
    +        Map<String, List<TopicPartition>> leaders = 
getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
    +        if (leaders != null) {
    +            Map<String, Map<Integer, Long>> logHeadOffsets = 
getLogHeadOffsets(leaders);
    +            Map<String, List<Integer>> topicPartitions = new HashMap<>();
    +            for (Map.Entry<String, List<TopicPartition>> entry: 
leaders.entrySet()) {
    +                for (TopicPartition topicPartition: entry.getValue()) {
    +                    if 
(!topicPartitions.containsKey(topicPartition.topic())) {
    +                        topicPartitions.put(topicPartition.topic(), new 
ArrayList<Integer>());
    +                    }
    +                    
topicPartitions.get(topicPartition.topic()).add(topicPartition.partition());
    +                }
    +            }
    +            Map<String, Map<Integer, Long>> oldConsumerOffsets = 
getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
    +            for (Map.Entry<String, Map<Integer, Long>> topicOffsets: 
logHeadOffsets.entrySet()) {
    +                for (Map.Entry<Integer, Long> partitionOffsets: 
topicOffsets.getValue().entrySet()) {
    +                    Long consumerCommittedOffset = 
oldConsumerOffsets.get(topicOffsets.getKey()) != null ? 
oldConsumerOffsets.get(topicOffsets.getKey()).get
    +                            (partitionOffsets.getKey()) : -1;
    +                    consumerCommittedOffset = (consumerCommittedOffset == 
null ? -1 : consumerCommittedOffset);
    +                    KafkaOffsetLagResult kafkaOffsetLagResult = new 
KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
    +                            consumerCommittedOffset, 
partitionOffsets.getValue());
    +                    result.add(kafkaOffsetLagResult);
    +                }
    +            }
    +        }
    +        return result;
    +    }
    +
    +    private static Map<String, List<TopicPartition>> 
getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery 
oldKafkaSpoutOffsetQuery) throws Exception {
    +        Map<String, List<TopicPartition>> result = new HashMap<>();
    +        if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
    +            String[] partitions = 
oldKafkaSpoutOffsetQuery.getPartitions().split(",");
    +            String[] leaders = 
oldKafkaSpoutOffsetQuery.getLeaders().split(",");
    +            for (int i = 0; i < leaders.length; ++i) {
    +                if (!result.containsKey(leaders[i])) {
    +                    result.put(leaders[i], new 
ArrayList<TopicPartition>());
    +                }
    +                result.get(leaders[i]).add(new 
TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(), 
Integer.parseInt(partitions[i])));
    +            }
    +        } else {
    --- End diff --
    
    when can this scenario occur? There is a lot of code in some of these 
methods. It would be ideal to put some comments hinting on what is going on, 
and/or split into smaller methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to