Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/1451#discussion_r65576345
--- Diff:
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.json.simple.JSONValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class for querying offset lag for kafka spout
+ */
+public class KafkaOffsetLagUtil {
+ private static final String OPTION_TOPIC_SHORT = "t";
+ private static final String OPTION_TOPIC_LONG = "topics";
+ private static final String OPTION_OLD_CONSUMER_SHORT = "o";
+ private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
+ private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
+ private static final String OPTION_BOOTSTRAP_BROKERS_LONG =
"bootstrap-brokers";
+ private static final String OPTION_GROUP_ID_SHORT = "g";
+ private static final String OPTION_GROUP_ID_LONG = "groupid";
+ private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
+ private static final String OPTION_TOPIC_WILDCARD_LONG =
"wildcard-topic";
+ private static final String OPTION_PARTITIONS_SHORT = "p";
+ private static final String OPTION_PARTITIONS_LONG = "partitions";
+ private static final String OPTION_LEADERS_SHORT = "l";
+ private static final String OPTION_LEADERS_LONG = "leaders";
+ private static final String OPTION_ZK_SERVERS_SHORT = "z";
+ private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
+ private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
+ private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
+ private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
+ private static final String OPTION_ZK_BROKERS_ROOT_LONG =
"zk-brokers-root-node";
+
+ public static void main (String args[]) {
+ try {
+ List<KafkaOffsetLagResult> results;
+ Options options = buildOptions();
+ CommandLineParser parser = new DefaultParser();
+ CommandLine commandLine = parser.parse(options, args);
+ if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
+ printUsageAndExit(options, OPTION_TOPIC_LONG + " is
required");
+ }
+ if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
+ OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
+ if (commandLine.hasOption(OPTION_GROUP_ID_LONG) ||
commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+ printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or
" + OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " +
+ OPTION_OLD_CONSUMER_LONG);
+ }
+ if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) ||
!commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
+ printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + "
and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required with " +
+ OPTION_OLD_CONSUMER_LONG);
+ }
+ String[] topics =
commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
+ if (topics != null && topics.length > 1) {
+ printUsageAndExit(options, "Multiple topics not
supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic
or a " +
+ "wildcard string for matching topics is
supported");
+ }
+ if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
+ if (commandLine.hasOption(OPTION_PARTITIONS_LONG) ||
commandLine.hasOption(OPTION_LEADERS_LONG)) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG
+ " or " + OPTION_LEADERS_LONG + " is not accepted with " +
+ OPTION_ZK_BROKERS_ROOT_LONG);
+ }
+ oldKafkaSpoutOffsetQuery = new
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
commandLine.getOptionValue
+ (OPTION_ZK_SERVERS_LONG),
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
+ (OPTION_TOPIC_WILDCARD_LONG),
commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
+ } else {
+ if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG))
{
+ printUsageAndExit(options,
OPTION_TOPIC_WILDCARD_LONG + " is not supported without " +
OPTION_ZK_BROKERS_ROOT_LONG);
+ }
+ if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) ||
!commandLine.hasOption(OPTION_LEADERS_LONG)) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG
+ " and " + OPTION_LEADERS_LONG + " are required if " +
OPTION_ZK_BROKERS_ROOT_LONG +
+ " is not provided");
+ }
+ String[] partitions =
commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
+ String[] leaders =
commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
+ if (partitions.length != leaders.length) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG
+ " and " + OPTION_LEADERS_LONG + " need to be of same size");
+ }
+ oldKafkaSpoutOffsetQuery = new
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
commandLine.getOptionValue
+ (OPTION_ZK_SERVERS_LONG),
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG),
commandLine.getOptionValue
+ (OPTION_PARTITIONS_LONG),
commandLine.getOptionValue(OPTION_LEADERS_LONG));
+ }
+ results = getOffsetLags(oldKafkaSpoutOffsetQuery);
+ } else {
+ String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG,
OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
+ OPTION_ZK_COMMITTED_NODE_LONG,
OPTION_ZK_BROKERS_ROOT_LONG};
+ for (String oldOption: oldSpoutOptions) {
+ if (commandLine.hasOption(oldOption)) {
+ printUsageAndExit(options, oldOption + " is not
accepted without " + OPTION_OLD_CONSUMER_LONG);
+ }
+ }
+ if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) ||
!commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+ printUsageAndExit(options, OPTION_GROUP_ID_LONG + "
and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " +
OPTION_OLD_CONSUMER_LONG +
+ " is not specified");
+ }
+ NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new
NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
+
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
commandLine.getOptionValue(OPTION_GROUP_ID_LONG));
+ results = getOffsetLags(newKafkaSpoutOffsetQuery);
+ }
+ System.out.print(JSONValue.toJSONString(results));
+ } catch (Exception ex) {
+ System.out.print("Unable to get offset lags for kafka. Reason:
");
+ ex.printStackTrace(System.out);
+ }
+ }
+
+ private static void printUsageAndExit (Options options, String
message) {
+ System.out.println(message);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("storm-kafka-monitor ", options);
+ System.exit(1);
+ }
+
+ private static Options buildOptions () {
+ Options options = new Options();
+ options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true,
"REQUIRED Topics (comma separated list) for fetching log head and spout
committed " +
+ "offset");
+ options.addOption(OPTION_OLD_CONSUMER_SHORT,
OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
+ options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT,
OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker
hosts for new " +
+ "consumer/spout e.g. hostname1:9092,hostname2:9092");
+ options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG,
true, "Group id of consumer (applicable only for new kafka spout) ");
+ options.addOption(OPTION_TOPIC_WILDCARD_SHORT,
OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as
supported by ZkHosts in " +
+ "old spout");
+ options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG,
true, "Comma separated list of partitions corresponding to " +
+ OPTION_LEADERS_LONG + " for old spout with StaticHosts");
+ options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true,
"Comma separated list of broker leaders corresponding to " +
+ OPTION_PARTITIONS_LONG + " for old spout with StaticHosts
e.g. hostname1:9092,hostname2:9092");
+ options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG,
true, "Comma separated list of zk servers for fetching spout committed offsets
" +
+ "and/or topic metadata for ZkHosts e.g
hostname1:2181,hostname2:2181");
+ options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT,
OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout
stores the committed" +
+ " offsets without the topic and partition nodes");
+ options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT,
OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker
information e.g. " +
+ "/brokers (applicable only for old kafka spout) ");
+ return options;
+ }
+
+ /**
+ *
+ * @param newKafkaSpoutOffsetQuery represents the information needed
to query kafka for log head and spout offsets
+ * @return log head offset, spout offset and lag for each partition
+ */
+ public static List<KafkaOffsetLagResult> getOffsetLags
(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
+ KafkaConsumer<String, String> consumer = null;
+ List<KafkaOffsetLagResult> result = new ArrayList<>();
+ try {
+ Properties props = new Properties();
+ props.put("bootstrap.servers",
newKafkaSpoutOffsetQuery.getBootStrapBrokers());
+ props.put("group.id",
newKafkaSpoutOffsetQuery.getConsumerGroupId());
+ props.put("enable.auto.commit", "false");
+ props.put("session.timeout.ms", "30000");
+ props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ List<TopicPartition> topicPartitionList = new ArrayList<>();
+ consumer = new KafkaConsumer<>(props);
+ for (String topic:
newKafkaSpoutOffsetQuery.getTopics().split(",")) {
+ List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(topic);
+ if (partitionInfoList != null) {
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ topicPartitionList.add(new
TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ }
+ }
+ }
+ consumer.assign(topicPartitionList);
+ for (TopicPartition topicPartition : topicPartitionList) {
+ OffsetAndMetadata offsetAndMetadata =
consumer.committed(topicPartition);
+ long committedOffset = offsetAndMetadata != null ?
offsetAndMetadata.offset() : -1;
+ consumer.seekToEnd(topicPartition);
+ result.add(new
KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(),
committedOffset, consumer.position(topicPartition)));
+ }
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ return result;
+ }
+
+ /**
+ *
+ * @param oldKafkaSpoutOffsetQuery represents the information needed
to query kafka for log head and spout offsets
+ * @return log head offset, spout offset and lag for each partition
+ */
+ public static List<KafkaOffsetLagResult> getOffsetLags
(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
+ List<KafkaOffsetLagResult> result = new ArrayList<>();
+ Map<String, List<TopicPartition>> leaders =
getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
+ if (leaders != null) {
+ Map<String, Map<Integer, Long>> logHeadOffsets =
getLogHeadOffsets(leaders);
+ Map<String, List<Integer>> topicPartitions = new HashMap<>();
+ for (Map.Entry<String, List<TopicPartition>> entry:
leaders.entrySet()) {
+ for (TopicPartition topicPartition: entry.getValue()) {
+ if
(!topicPartitions.containsKey(topicPartition.topic())) {
+ topicPartitions.put(topicPartition.topic(), new
ArrayList<Integer>());
+ }
+
topicPartitions.get(topicPartition.topic()).add(topicPartition.partition());
+ }
+ }
+ Map<String, Map<Integer, Long>> oldConsumerOffsets =
getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
+ for (Map.Entry<String, Map<Integer, Long>> topicOffsets:
logHeadOffsets.entrySet()) {
+ for (Map.Entry<Integer, Long> partitionOffsets:
topicOffsets.getValue().entrySet()) {
+ Long consumerCommittedOffset =
oldConsumerOffsets.get(topicOffsets.getKey()) != null ?
oldConsumerOffsets.get(topicOffsets.getKey()).get
+ (partitionOffsets.getKey()) : -1;
+ consumerCommittedOffset = (consumerCommittedOffset ==
null ? -1 : consumerCommittedOffset);
+ KafkaOffsetLagResult kafkaOffsetLagResult = new
KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
+ consumerCommittedOffset,
partitionOffsets.getValue());
+ result.add(kafkaOffsetLagResult);
+ }
+ }
+ }
+ return result;
+ }
+
+ private static Map<String, List<TopicPartition>>
getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery
oldKafkaSpoutOffsetQuery) throws Exception {
+ Map<String, List<TopicPartition>> result = new HashMap<>();
+ if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
+ String[] partitions =
oldKafkaSpoutOffsetQuery.getPartitions().split(",");
+ String[] leaders =
oldKafkaSpoutOffsetQuery.getLeaders().split(",");
+ for (int i = 0; i < leaders.length; ++i) {
+ if (!result.containsKey(leaders[i])) {
+ result.put(leaders[i], new
ArrayList<TopicPartition>());
+ }
+ result.get(leaders[i]).add(new
TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(),
Integer.parseInt(partitions[i])));
+ }
+ } else {
--- End diff --
when can this scenario occur? There is a lot of code in some of these
methods. It would be ideal to put some comments hinting on what is going on,
and/or split into smaller methods.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---