[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-12103:
-------------------------------
    Description: 
(Note: yes, there is a Direct API that may be better, but it's not the easiest 
thing to get started with. The Kafka Receiver API still needs to work, 
especially for newcomers)

When creating a receiver stream using KafkaUtils, there is a valid use case 
where you would want to pool resources. I have 10+ topics and don't want to 
dedicate 10 cores to processing all of them. However, when reading the data 
procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
properly insert the topic name into the tuple. The left-key always null, making 
it impossible to know what topic that data came from other than stashing your 
key into the value.  Is there a way around that problem?

//// CODE

val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
"topicE" -> 1, "topicF" -> 1, ...)

val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i 
=>
  KafkaUtils.createStream[STring, STring, StringDecoder, StringDecoder](
    ssc, consumerProperties,
    topics,
    StorageLevel.MEMORY_ONLY_SER((

val unioned :DStream[(String,String)] = ssc.union(streams)

unioned.flatMap(x => {

   val (key, value) = x
  // key is always null!
  // value has data from any one of my topics

  key match ... {
      ......
  }

}

//// END CODE




  was:
Default way of creating stream out of Kafka source would be as

    val stream = KafkaUtils.createStream(ssc,"localhost:2181","logs", 
Map("retarget" -> 2,"datapair" -> 2))

However, if two topics - in this case "retarget" and "datapair" - are very 
different, there is no way to set up different filter, mapping functions, etc), 
as they are effectively merged.

However, instance of KafkaInputDStream, created with this call internally calls 
ConsumerConnector.createMessageStream() which returns *map* of KafkaStreams, 
keyed by topic. It would be great if this map would be exposed somehow, so 
aforementioned call 

    val streamS = KafkaUtils.createStreamS(...)

returned map of streams.

Regards,
Sergey Malov
Collective Media


> KafkaUtils createStream with multiple topics -- does not work as expected
> -------------------------------------------------------------------------
>
>                 Key: SPARK-12103
>                 URL: https://issues.apache.org/jira/browse/SPARK-12103
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.0.0
>            Reporter: Dan Dutrow
>             Fix For: 1.0.1
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to pool resources. I have 10+ topics and don't want to 
> dedicate 10 cores to processing all of them. However, when reading the data 
> procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
> properly insert the topic name into the tuple. The left-key always null, 
> making it impossible to know what topic that data came from other than 
> stashing your key into the value.  Is there a way around that problem?
> //// CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[STring, STring, StringDecoder, StringDecoder](
>     ssc, consumerProperties,
>     topics,
>     StorageLevel.MEMORY_ONLY_SER((
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>    val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>       ......
>   }
> }
> //// END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to