@Dan shouldn't you be using Dataset/Dataframes ? I heard it is recommended
to use Dataset and Dataframes than using Dstreams since Dstreams is in
maintenance mode.

On Mon, Sep 11, 2017 at 7:41 AM, Cody Koeninger <c...@koeninger.org> wrote:

> If you want an "easy" but not particularly performant way to do it, each
> org.apache.kafka.clients.consumer.ConsumerRecord has a topic.
>
> The topic is going to be the same for the entire partition as long as you
> haven't shuffled, hence the examples on how to deal with it at a partition
> level.
>
> On Fri, Sep 8, 2017 at 8:29 PM, Dan Dong <dongda...@gmail.com> wrote:
>
>> Hi,Alonso.
>>   Thanks! I've read about this but did not quite understand it. To pick
>> out the topic name of a kafka message seems a simple task but the example
>> code looks so complicated with redundent info. Why do we need offsetRanges
>> here and do we have a easy way to achieve this?
>>
>> Cheers,
>> Dan
>>
>>
>> 2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman <alons...@gmail.com>:
>>
>>> Hi, reading the official doc
>>> <http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html>,
>>> i think you can do it this way:
>>>
>>> import org.apache.spark.streaming.kafka._
>>>
>>>    val directKafkaStream = KafkaUtils.createDirectStream[String, String, 
>>> StringDecoder, StringDecoder](
>>>
>>>       ssc, kafkaParams, topicsSet)
>>>
>>>
>>>  // Hold a reference to the current offset ranges, so it can be used 
>>> downstream
>>>  var offsetRanges = Array.empty[OffsetRange]
>>>
>>>  directKafkaStream.transform { rdd =>
>>>    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>    rdd
>>>  }.map {
>>>            ...
>>>  }.foreachRDD { rdd =>
>>>    for (o <- offsetRanges) {
>>>      println(*s"${o.topic}* ${o.partition} ${o.fromOffset} 
>>> ${o.untilOffset}")
>>>    }
>>>
>>>  }
>>>
>>>
>>> 2017-09-06 14:38 GMT+02:00 Dan Dong <dongda...@gmail.com>:
>>>
>>>> Hi, All,
>>>>   I have one issue here about how to process multiple Kafka topics in a
>>>> Spark 2.* program. My question is: How to get the topic name from a message
>>>> received from Kafka? E.g:
>>>>
>>>> ......
>>>>     val messages = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder](
>>>>       ssc, kafkaParams, topicsSet)
>>>>
>>>>     // Get the lines, split them into words, count the words and print
>>>>     val lines = messages.map(_._2)
>>>>     val words = lines.flatMap(_.split(" "))
>>>>     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>>>>     wordCounts.print()
>>>> ......
>>>>
>>>> Kafka send the messages in multiple topics through console producer for
>>>> example. But when Spark receive the message, how it will know which topic
>>>> is this piece of message coming from? Thanks a lot for any of your helps!
>>>>
>>>> Cheers,
>>>> Dan
>>>>
>>>
>>>
>>>
>>> --
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>
>>
>>
>

Reply via email to