[
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15036625#comment-15036625
]
Dan Dutrow commented on SPARK-12103:
------------------------------------
After digging into the Kafka code some more (specifically
kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and
kafka.message.MessageAndMetadata), it appears that the Left value of the tuple
is not the topic name but rather a key that Kafka puts on each message. See
http://kafka.apache.org/documentation.html#compaction
I don't see a way around this without hacking KafkaStream and ConsumerIterator
to return the topic name instead of the message key.
The return value should probably be clarified in the documentation.
> KafkaUtils createStream with multiple topics -- does not work as expected
> -------------------------------------------------------------------------
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1
> Reporter: Dan Dutrow
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the
> easiest thing to get started with. The Kafka Receiver API still needs to
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case
> where you would want to pool resources. I have 10+ topics and don't want to
> dedicate 10 cores to processing all of them. However, when reading the data
> procuced by KafkaUtils.createStream, the DStream[(String,String)] does not
> properly insert the topic name into the tuple. The left-key always null,
> making it impossible to know what topic that data came from other than
> stashing your key into the value. Is there a way around that problem?
> //// CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1,
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map(
> i =>
> KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
> val (key, value) = x
> // key is always null!
> // value has data from any one of my topics
> key match ... {
> ......
> }
> }
> //// END CODE
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]