[ https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193702#comment-15193702 ]
ASF GitHub Bot commented on STORM-822: -------------------------------------- Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/1131#discussion_r56041205 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Represents the output streams associated with each topic, and provides a public API to + * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. + */ +public class KafkaSpoutStreams implements Serializable { + private final Map<String, KafkaSpoutStream> topicToStream; + + private KafkaSpoutStreams(Builder builder) { + this.topicToStream = builder.topicToStream; + } + + /** + * @param topic the topic for which to get output fields + * @return the output fields declared + */ + public Fields getOutputFields(String topic) { + if (topicToStream.containsKey(topic)) { + return topicToStream.get(topic).getOutputFields(); + } + throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); + } + + /** + * @param topic the topic to for which to get the stream id + * @return the id of the stream to where the tuples are emitted + */ + public String getStreamId(String topic) { + if (topicToStream.containsKey(topic)) { + return topicToStream.get(topic).getStreamId(); + } + throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); + } + + /** + * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream} + */ + public List<String> getTopics() { + return new ArrayList<>(topicToStream.keySet()); + } + + void declareOutputFields(OutputFieldsDeclarer declarer) { + for (KafkaSpoutStream stream : topicToStream.values()) { + declarer.declareStream(stream.getStreamId(), stream.getOutputFields()); + } + } + + void emit(SpoutOutputCollector collector, MessageId messageId) { + collector.emit(getStreamId(messageId.topic()), messageId.getTuple(), messageId); --- End diff -- @abhishekagarwal87 @harshach @revans2 I have a patch I am about to upload that I believe addresses most or all of these points. It will give an option to have a behavior identical to the previous Kafka Spout, and an option to keep an upper limit on the amount of memory (i.e. pending records in memory) used by each spout. > As a storm developer I’d like to use the new kafka consumer API (0.8.3) to > reduce dependencies and use long term supported kafka apis > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: STORM-822 > URL: https://issues.apache.org/jira/browse/STORM-822 > Project: Apache Storm > Issue Type: Story > Components: storm-kafka > Reporter: Thomas Becker > Assignee: Hugo Louro > -- This message was sent by Atlassian JIRA (v6.3.4#6332)