[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-12103. ------------------------------- Resolution: Fixed Fix Version/s: 1.6.1 2.0.0 Issue resolved by pull request 10132 [https://github.com/apache/spark/pull/10132] > KafkaUtils createStream with multiple topics -- does not work as expected > ------------------------------------------------------------------------- > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming > Affects Versions: 1.4.1 > Reporter: Dan Dutrow > Priority: Minor > Fix For: 2.0.0, 1.6.1 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > //// CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { > val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > ...... > } > } > //// END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org