[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036625#comment-15036625 ]
Dan Dutrow edited comment on SPARK-12103 at 12/2/15 10:38 PM: -------------------------------------------------------------- After digging into the Kafka code some more (specifically kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and kafka.message.MessageAndMetadata), it appears that the Left value of the tuple is not the topic name but rather a key that Kafka puts on each message. See http://kafka.apache.org/documentation.html#impl_producer I don't see a way around this without hacking KafkaStream and ConsumerIterator to return the topic name instead of the message key. The return value should probably be clarified in the documentation. was (Author: dutrow): After digging into the Kafka code some more (specifically kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and kafka.message.MessageAndMetadata), it appears that the Left value of the tuple is not the topic name but rather a key that Kafka puts on each message. See http://kafka.apache.org/documentation.html#compaction I don't see a way around this without hacking KafkaStream and ConsumerIterator to return the topic name instead of the message key. The return value should probably be clarified in the documentation. > KafkaUtils createStream with multiple topics -- does not work as expected > ------------------------------------------------------------------------- > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming > Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1 > Reporter: Dan Dutrow > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > //// CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { > val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > ...... > } > } > //// END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org