[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036625#comment-15036625
 ] 

Dan Dutrow edited comment on SPARK-12103 at 12/2/15 10:38 PM:
--------------------------------------------------------------

After digging into the Kafka code some more (specifically 
kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and 
kafka.message.MessageAndMetadata), it appears that the Left value of the tuple 
is not the topic name but rather a key that Kafka puts on each message. See 
http://kafka.apache.org/documentation.html#impl_producer

I don't see a way around this without hacking KafkaStream and ConsumerIterator 
to return the topic name instead of the message key.

The return value should probably be clarified in the documentation.


was (Author: dutrow):
After digging into the Kafka code some more (specifically 
kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and 
kafka.message.MessageAndMetadata), it appears that the Left value of the tuple 
is not the topic name but rather a key that Kafka puts on each message. See 
http://kafka.apache.org/documentation.html#compaction

I don't see a way around this without hacking KafkaStream and ConsumerIterator 
to return the topic name instead of the message key.

The return value should probably be clarified in the documentation.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -------------------------------------------------------------------------
>
>                 Key: SPARK-12103
>                 URL: https://issues.apache.org/jira/browse/SPARK-12103
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1
>            Reporter: Dan Dutrow
>             Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
> //// CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>     ssc, consumerProperties,
>     topics,
>     StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>    val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>       ......
>   }
> }
> //// END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to