Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2150#discussion_r123886463
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
 ---
    @@ -0,0 +1,59 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +
    +/**
    + * Filter that returns all partitions for topics matching the given {@link 
Pattern}.
    + */
    +public class PatternTopicFilter implements TopicFilter {
    +    
    +    private final Pattern pattern;
    +
    +    /**
    +     * Creates filter based on a Pattern. Only topic names matching the 
Pattern are passed by the filter.
    +     * @param pattern The Pattern to use.
    +     */
    +    public PatternTopicFilter(Pattern pattern) {
    +        this.pattern = pattern;
    +    }
    +
    +    @Override
    +    public List<TopicPartition> 
getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
    +        List<TopicPartition> allPartitions = new ArrayList<>();
    +        for (Map.Entry<String, List<PartitionInfo>> entry: 
consumer.listTopics().entrySet()) {
    +            if (pattern.matcher(entry.getKey()).matches()) {
    +                for (PartitionInfo partitionInfo: entry.getValue()) {
    +                    allPartitions.add(new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
    +                }
    +            }
    +        }
    +        return allPartitions;
    +    }
    +
    +    @Override
    +    public String getTopicsString() {
    +        return pattern.pattern();
    --- End diff --
    
    @srdo I believe that the approach I am suggesting in the Trident PR will 
solve what both you and I have in mind. Please take a look at the following:
    
    
https://github.com/hmcl/storm-apache/blob/ca420a4a960c9667262833add74e231fe8d5518b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java#L60-L67
    
    
https://github.com/hmcl/storm-apache/blob/ca420a4a960c9667262833add74e231fe8d5518b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java#L35
    
    
https://github.com/hmcl/storm-apache/blob/ca420a4a960c9667262833add74e231fe8d5518b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java#L53


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to