Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2150#discussion_r123901054
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
 ---
    @@ -0,0 +1,59 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +
    +/**
    + * Filter that returns all partitions for topics matching the given {@link 
Pattern}.
    + */
    +public class PatternTopicFilter implements TopicFilter {
    +    
    +    private final Pattern pattern;
    +
    +    /**
    +     * Creates filter based on a Pattern. Only topic names matching the 
Pattern are passed by the filter.
    +     * @param pattern The Pattern to use.
    +     */
    +    public PatternTopicFilter(Pattern pattern) {
    +        this.pattern = pattern;
    +    }
    +
    +    @Override
    +    public List<TopicPartition> 
getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
    +        List<TopicPartition> allPartitions = new ArrayList<>();
    +        for (Map.Entry<String, List<PartitionInfo>> entry: 
consumer.listTopics().entrySet()) {
    +            if (pattern.matcher(entry.getKey()).matches()) {
    +                for (PartitionInfo partitionInfo: entry.getValue()) {
    +                    allPartitions.add(new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
    +                }
    +            }
    +        }
    +        return allPartitions;
    +    }
    +
    +    @Override
    +    public String getTopicsString() {
    +        return pattern.pattern();
    --- End diff --
    
    @hmcl I went and tried out storm-kafka-monitor from the 
https://github.com/apache/storm/pull/2174 branch, and ran into multiple issues.
    
    * The PatternTopicFilter code throws an NPE because the spout calls 
getTopicString before the Set is initialized (the method is used for logging). 
This prevents the topology from being submitted.
    * It looks like the Kafka consumer code is not being correctly shaded into 
the storm-kafka-monitor jar, so the bash script didn't work for me. I'm not 
sure if this was a quirk of my build, since the consumer code is present in the 
1.1.0 release storm-kafka-monitor jar.
    * After fixing the two other issues, I tried submitting a topology 
consuming the "test*" pattern from a Kafka instance where topics "test", 
"test-1" and "test-2" were present. The Storm UI offset lag request returns the 
following, no matter how long I wait after the topology is running
    
    ```
    {"kafka-spout":{"spoutId":"kafka-spout","errorInfo":"Unable to get offset 
lags for kafka. Reason: 
org.apache.kafka.shaded.common.errors.InvalidTopicException: Topic '' is 
invalid\n","spoutType":"KAFKA"}}
    ```
    
    That code 
(https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java#L91)
 gets the topic list it should request offset lag for via the spout's 
getComponentOffsets method, as far as I can tell through this call 
https://github.com/apache/storm/blob/9e31509d47c4e91c1009f55c7ccf321d7d7e63aa/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java#L541.
 I don't think we can make the current storm-kafka-monitor implementation work 
for subscriptions that are not known at topology submission time, unless there 
is some way to update the component configuration when the subscription changes.
    
    I'd also add that I have a few other issues with storm-kafka-monitor:
    * The jar is installed along with the cluster, and depends on the Kafka 
version specified in Storm's root POM. Kafka guarantees backwards compatible 
client-server communication for one release only, so there's a potential 
coupling between Storm cluster version and Kafka version. If users want to 
update the Kafka version in storm-kafka-monitor, they have to rebuild that 
module and replace the jar in their Storm install.
    * The UI integration uses the storm-kafka-monitor Bash script to start the 
monitoring code, in order to avoid a dependency between storm-core and 
storm-kafka-monitor. This prevents the UI integration from working on Windows. 
We could supply a Windows script as well, but then we'd need to keep the two in 
sync.
    
    I think it makes sense to make the change you are suggesting simply because 
it makes the spout log a little nicer, but it still leaves the issue with 
storm-kafka-monitor not working for pattern subscriptions. It would be good if 
we could figure out a solution to the other issues as well. Maybe we could 
track offset lag through the metrics system instead, and just make the spout 
update it?
    
    I'll create a separate issue for discussing storm-kafka-monitor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to