Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2150#discussion_r123901054 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java --- @@ -0,0 +1,59 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for topics matching the given {@link Pattern}. + */ +public class PatternTopicFilter implements TopicFilter { + + private final Pattern pattern; + + /** + * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter. + * @param pattern The Pattern to use. + */ + public PatternTopicFilter(Pattern pattern) { + this.pattern = pattern; + } + + @Override + public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) { + List<TopicPartition> allPartitions = new ArrayList<>(); + for (Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet()) { + if (pattern.matcher(entry.getKey()).matches()) { + for (PartitionInfo partitionInfo: entry.getValue()) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return pattern.pattern(); --- End diff -- @hmcl I went and tried out storm-kafka-monitor from the https://github.com/apache/storm/pull/2174 branch, and ran into multiple issues. * The PatternTopicFilter code throws an NPE because the spout calls getTopicString before the Set is initialized (the method is used for logging). This prevents the topology from being submitted. * It looks like the Kafka consumer code is not being correctly shaded into the storm-kafka-monitor jar, so the bash script didn't work for me. I'm not sure if this was a quirk of my build, since the consumer code is present in the 1.1.0 release storm-kafka-monitor jar. * After fixing the two other issues, I tried submitting a topology consuming the "test*" pattern from a Kafka instance where topics "test", "test-1" and "test-2" were present. The Storm UI offset lag request returns the following, no matter how long I wait after the topology is running ``` {"kafka-spout":{"spoutId":"kafka-spout","errorInfo":"Unable to get offset lags for kafka. Reason: org.apache.kafka.shaded.common.errors.InvalidTopicException: Topic '' is invalid\n","spoutType":"KAFKA"}} ``` That code (https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java#L91) gets the topic list it should request offset lag for via the spout's getComponentOffsets method, as far as I can tell through this call https://github.com/apache/storm/blob/9e31509d47c4e91c1009f55c7ccf321d7d7e63aa/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java#L541. I don't think we can make the current storm-kafka-monitor implementation work for subscriptions that are not known at topology submission time, unless there is some way to update the component configuration when the subscription changes. I'd also add that I have a few other issues with storm-kafka-monitor: * The jar is installed along with the cluster, and depends on the Kafka version specified in Storm's root POM. Kafka guarantees backwards compatible client-server communication for one release only, so there's a potential coupling between Storm cluster version and Kafka version. If users want to update the Kafka version in storm-kafka-monitor, they have to rebuild that module and replace the jar in their Storm install. * The UI integration uses the storm-kafka-monitor Bash script to start the monitoring code, in order to avoid a dependency between storm-core and storm-kafka-monitor. This prevents the UI integration from working on Windows. We could supply a Windows script as well, but then we'd need to keep the two in sync. I think it makes sense to make the change you are suggesting simply because it makes the spout log a little nicer, but it still leaves the issue with storm-kafka-monitor not working for pattern subscriptions. It would be good if we could figure out a solution to the other issues as well. Maybe we could track offset lag through the metrics system instead, and just make the spout update it? I'll create a separate issue for discussing storm-kafka-monitor.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---