Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/1451#discussion_r65576345 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java --- @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.monitor; + +import kafka.api.OffsetRequest; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.TopicAndPartition; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.json.simple.JSONValue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Utility class for querying offset lag for kafka spout + */ +public class KafkaOffsetLagUtil { + private static final String OPTION_TOPIC_SHORT = "t"; + private static final String OPTION_TOPIC_LONG = "topics"; + private static final String OPTION_OLD_CONSUMER_SHORT = "o"; + private static final String OPTION_OLD_CONSUMER_LONG = "old-spout"; + private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b"; + private static final String OPTION_BOOTSTRAP_BROKERS_LONG = "bootstrap-brokers"; + private static final String OPTION_GROUP_ID_SHORT = "g"; + private static final String OPTION_GROUP_ID_LONG = "groupid"; + private static final String OPTION_TOPIC_WILDCARD_SHORT = "w"; + private static final String OPTION_TOPIC_WILDCARD_LONG = "wildcard-topic"; + private static final String OPTION_PARTITIONS_SHORT = "p"; + private static final String OPTION_PARTITIONS_LONG = "partitions"; + private static final String OPTION_LEADERS_SHORT = "l"; + private static final String OPTION_LEADERS_LONG = "leaders"; + private static final String OPTION_ZK_SERVERS_SHORT = "z"; + private static final String OPTION_ZK_SERVERS_LONG = "zk-servers"; + private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n"; + private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node"; + private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r"; + private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node"; + + public static void main (String args[]) { + try { + List<KafkaOffsetLagResult> results; + Options options = buildOptions(); + CommandLineParser parser = new DefaultParser(); + CommandLine commandLine = parser.parse(options, args); + if (!commandLine.hasOption(OPTION_TOPIC_LONG)) { + printUsageAndExit(options, OPTION_TOPIC_LONG + " is required"); + } + if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) { + OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery; + if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) { + printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " + + OPTION_OLD_CONSUMER_LONG); + } + if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || !commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) { + printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required with " + + OPTION_OLD_CONSUMER_LONG); + } + String[] topics = commandLine.getOptionValue(OPTION_TOPIC_LONG).split(","); + if (topics != null && topics.length > 1) { + printUsageAndExit(options, "Multiple topics not supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic or a " + + "wildcard string for matching topics is supported"); + } + if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) { + if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || commandLine.hasOption(OPTION_LEADERS_LONG)) { + printUsageAndExit(options, OPTION_PARTITIONS_LONG + " or " + OPTION_LEADERS_LONG + " is not accepted with " + + OPTION_ZK_BROKERS_ROOT_LONG); + } + oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue + (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption + (OPTION_TOPIC_WILDCARD_LONG), commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG)); + } else { + if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG)) { + printUsageAndExit(options, OPTION_TOPIC_WILDCARD_LONG + " is not supported without " + OPTION_ZK_BROKERS_ROOT_LONG); + } + if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || !commandLine.hasOption(OPTION_LEADERS_LONG)) { + printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " are required if " + OPTION_ZK_BROKERS_ROOT_LONG + + " is not provided"); + } + String[] partitions = commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(","); + String[] leaders = commandLine.getOptionValue(OPTION_LEADERS_LONG).split(","); + if (partitions.length != leaders.length) { + printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " need to be of same size"); + } + oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue + (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.getOptionValue + (OPTION_PARTITIONS_LONG), commandLine.getOptionValue(OPTION_LEADERS_LONG)); + } + results = getOffsetLags(oldKafkaSpoutOffsetQuery); + } else { + String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG, + OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG}; + for (String oldOption: oldSpoutOptions) { + if (commandLine.hasOption(oldOption)) { + printUsageAndExit(options, oldOption + " is not accepted without " + OPTION_OLD_CONSUMER_LONG); + } + } + if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || !commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) { + printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " + OPTION_OLD_CONSUMER_LONG + + " is not specified"); + } + NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), + commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG)); + results = getOffsetLags(newKafkaSpoutOffsetQuery); + } + System.out.print(JSONValue.toJSONString(results)); + } catch (Exception ex) { + System.out.print("Unable to get offset lags for kafka. Reason: "); + ex.printStackTrace(System.out); + } + } + + private static void printUsageAndExit (Options options, String message) { + System.out.println(message); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("storm-kafka-monitor ", options); + System.exit(1); + } + + private static Options buildOptions () { + Options options = new Options(); + options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, "REQUIRED Topics (comma separated list) for fetching log head and spout committed " + + "offset"); + options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout"); + options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker hosts for new " + + "consumer/spout e.g. hostname1:9092,hostname2:9092"); + options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer (applicable only for new kafka spout) "); + options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as supported by ZkHosts in " + + "old spout"); + options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, true, "Comma separated list of partitions corresponding to " + + OPTION_LEADERS_LONG + " for old spout with StaticHosts"); + options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, "Comma separated list of broker leaders corresponding to " + + OPTION_PARTITIONS_LONG + " for old spout with StaticHosts e.g. hostname1:9092,hostname2:9092"); + options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true, "Comma separated list of zk servers for fetching spout committed offsets " + + "and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181"); + options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout stores the committed" + + " offsets without the topic and partition nodes"); + options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " + + "/brokers (applicable only for old kafka spout) "); + return options; + } + + /** + * + * @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets + * @return log head offset, spout offset and lag for each partition + */ + public static List<KafkaOffsetLagResult> getOffsetLags (NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) { + KafkaConsumer<String, String> consumer = null; + List<KafkaOffsetLagResult> result = new ArrayList<>(); + try { + Properties props = new Properties(); + props.put("bootstrap.servers", newKafkaSpoutOffsetQuery.getBootStrapBrokers()); + props.put("group.id", newKafkaSpoutOffsetQuery.getConsumerGroupId()); + props.put("enable.auto.commit", "false"); + props.put("session.timeout.ms", "30000"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + List<TopicPartition> topicPartitionList = new ArrayList<>(); + consumer = new KafkaConsumer<>(props); + for (String topic: newKafkaSpoutOffsetQuery.getTopics().split(",")) { + List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic); + if (partitionInfoList != null) { + for (PartitionInfo partitionInfo : partitionInfoList) { + topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + } + } + consumer.assign(topicPartitionList); + for (TopicPartition topicPartition : topicPartitionList) { + OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition); + long committedOffset = offsetAndMetadata != null ? offsetAndMetadata.offset() : -1; + consumer.seekToEnd(topicPartition); + result.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), committedOffset, consumer.position(topicPartition))); + } + } finally { + if (consumer != null) { + consumer.close(); + } + } + return result; + } + + /** + * + * @param oldKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets + * @return log head offset, spout offset and lag for each partition + */ + public static List<KafkaOffsetLagResult> getOffsetLags (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception { + List<KafkaOffsetLagResult> result = new ArrayList<>(); + Map<String, List<TopicPartition>> leaders = getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery); + if (leaders != null) { + Map<String, Map<Integer, Long>> logHeadOffsets = getLogHeadOffsets(leaders); + Map<String, List<Integer>> topicPartitions = new HashMap<>(); + for (Map.Entry<String, List<TopicPartition>> entry: leaders.entrySet()) { + for (TopicPartition topicPartition: entry.getValue()) { + if (!topicPartitions.containsKey(topicPartition.topic())) { + topicPartitions.put(topicPartition.topic(), new ArrayList<Integer>()); + } + topicPartitions.get(topicPartition.topic()).add(topicPartition.partition()); + } + } + Map<String, Map<Integer, Long>> oldConsumerOffsets = getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery); + for (Map.Entry<String, Map<Integer, Long>> topicOffsets: logHeadOffsets.entrySet()) { + for (Map.Entry<Integer, Long> partitionOffsets: topicOffsets.getValue().entrySet()) { + Long consumerCommittedOffset = oldConsumerOffsets.get(topicOffsets.getKey()) != null ? oldConsumerOffsets.get(topicOffsets.getKey()).get + (partitionOffsets.getKey()) : -1; + consumerCommittedOffset = (consumerCommittedOffset == null ? -1 : consumerCommittedOffset); + KafkaOffsetLagResult kafkaOffsetLagResult = new KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(), + consumerCommittedOffset, partitionOffsets.getValue()); + result.add(kafkaOffsetLagResult); + } + } + } + return result; + } + + private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception { + Map<String, List<TopicPartition>> result = new HashMap<>(); + if (oldKafkaSpoutOffsetQuery.getPartitions() != null) { + String[] partitions = oldKafkaSpoutOffsetQuery.getPartitions().split(","); + String[] leaders = oldKafkaSpoutOffsetQuery.getLeaders().split(","); + for (int i = 0; i < leaders.length; ++i) { + if (!result.containsKey(leaders[i])) { + result.put(leaders[i], new ArrayList<TopicPartition>()); + } + result.get(leaders[i]).add(new TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(), Integer.parseInt(partitions[i]))); + } + } else { --- End diff -- when can this scenario occur? There is a lot of code in some of these methods. It would be ideal to put some comments hinting on what is going on, and/or split into smaller methods.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---