[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15037773#comment-15037773
 ] 

Cody Koeninger commented on SPARK-12103:
----------------------------------------

A cursory review of the Kafka project documentation should reveal that messages 
have a key (used for distribution among partitions) and a value. Why would one 
reasonably expect that Spark documentation referring to a Kafka message key was 
instead supposed to be the message topic?

If you really want the topic name in each item of the rdd, create your separate 
streams, map over them to add the topic name, then union them together into a 
single stream.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -------------------------------------------------------------------------
>
>                 Key: SPARK-12103
>                 URL: https://issues.apache.org/jira/browse/SPARK-12103
>             Project: Spark
>          Issue Type: Improvement
>          Components: Documentation, Streaming
>    Affects Versions: 1.4.1
>            Reporter: Dan Dutrow
>            Priority: Minor
>             Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
> //// CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>     ssc, consumerProperties,
>     topics,
>     StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>    val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>       ......
>   }
> }
> //// END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to