[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dan Dutrow updated SPARK-12103: ------------------------------- Description: (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers) When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem? //// CODE val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...) val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i => KafkaUtils.createStream[STring, STring, StringDecoder, StringDecoder]( ssc, consumerProperties, topics, StorageLevel.MEMORY_ONLY_SER(( val unioned :DStream[(String,String)] = ssc.union(streams) unioned.flatMap(x => { val (key, value) = x // key is always null! // value has data from any one of my topics key match ... { ...... } } //// END CODE was: Default way of creating stream out of Kafka source would be as val stream = KafkaUtils.createStream(ssc,"localhost:2181","logs", Map("retarget" -> 2,"datapair" -> 2)) However, if two topics - in this case "retarget" and "datapair" - are very different, there is no way to set up different filter, mapping functions, etc), as they are effectively merged. However, instance of KafkaInputDStream, created with this call internally calls ConsumerConnector.createMessageStream() which returns *map* of KafkaStreams, keyed by topic. It would be great if this map would be exposed somehow, so aforementioned call val streamS = KafkaUtils.createStreamS(...) returned map of streams. Regards, Sergey Malov Collective Media > KafkaUtils createStream with multiple topics -- does not work as expected > ------------------------------------------------------------------------- > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming > Affects Versions: 1.0.0 > Reporter: Dan Dutrow > Fix For: 1.0.1 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to pool resources. I have 10+ topics and don't want to > dedicate 10 cores to processing all of them. However, when reading the data > procuced by KafkaUtils.createStream, the DStream[(String,String)] does not > properly insert the topic name into the tuple. The left-key always null, > making it impossible to know what topic that data came from other than > stashing your key into the value. Is there a way around that problem? > //// CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[STring, STring, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER(( > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { > val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > ...... > } > } > //// END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org