@Dan shouldn't you be using Dataset/Dataframes ? I heard it is recommended to use Dataset and Dataframes than using Dstreams since Dstreams is in maintenance mode.
On Mon, Sep 11, 2017 at 7:41 AM, Cody Koeninger <c...@koeninger.org> wrote: > If you want an "easy" but not particularly performant way to do it, each > org.apache.kafka.clients.consumer.ConsumerRecord has a topic. > > The topic is going to be the same for the entire partition as long as you > haven't shuffled, hence the examples on how to deal with it at a partition > level. > > On Fri, Sep 8, 2017 at 8:29 PM, Dan Dong <dongda...@gmail.com> wrote: > >> Hi,Alonso. >> Thanks! I've read about this but did not quite understand it. To pick >> out the topic name of a kafka message seems a simple task but the example >> code looks so complicated with redundent info. Why do we need offsetRanges >> here and do we have a easy way to achieve this? >> >> Cheers, >> Dan >> >> >> 2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman <alons...@gmail.com>: >> >>> Hi, reading the official doc >>> <http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html>, >>> i think you can do it this way: >>> >>> import org.apache.spark.streaming.kafka._ >>> >>> val directKafkaStream = KafkaUtils.createDirectStream[String, String, >>> StringDecoder, StringDecoder]( >>> >>> ssc, kafkaParams, topicsSet) >>> >>> >>> // Hold a reference to the current offset ranges, so it can be used >>> downstream >>> var offsetRanges = Array.empty[OffsetRange] >>> >>> directKafkaStream.transform { rdd => >>> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>> rdd >>> }.map { >>> ... >>> }.foreachRDD { rdd => >>> for (o <- offsetRanges) { >>> println(*s"${o.topic}* ${o.partition} ${o.fromOffset} >>> ${o.untilOffset}") >>> } >>> >>> } >>> >>> >>> 2017-09-06 14:38 GMT+02:00 Dan Dong <dongda...@gmail.com>: >>> >>>> Hi, All, >>>> I have one issue here about how to process multiple Kafka topics in a >>>> Spark 2.* program. My question is: How to get the topic name from a message >>>> received from Kafka? E.g: >>>> >>>> ...... >>>> val messages = KafkaUtils.createDirectStream[String, String, >>>> StringDecoder, StringDecoder]( >>>> ssc, kafkaParams, topicsSet) >>>> >>>> // Get the lines, split them into words, count the words and print >>>> val lines = messages.map(_._2) >>>> val words = lines.flatMap(_.split(" ")) >>>> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) >>>> wordCounts.print() >>>> ...... >>>> >>>> Kafka send the messages in multiple topics through console producer for >>>> example. But when Spark receive the message, how it will know which topic >>>> is this piece of message coming from? Thanks a lot for any of your helps! >>>> >>>> Cheers, >>>> Dan >>>> >>> >>> >>> >>> -- >>> Alonso Isidoro Roman >>> [image: https://]about.me/alonso.isidoro.roman >>> >>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >>> >> >> >