Hi, reading the official doc
<http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html>,
i think you can do it this way:

import org.apache.spark.streaming.kafka._

   val directKafkaStream = KafkaUtils.createDirectStream[String,
String, StringDecoder, StringDecoder](

      ssc, kafkaParams, topicsSet)


 // Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(*s"${o.topic}* ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }

 }


2017-09-06 14:38 GMT+02:00 Dan Dong <dongda...@gmail.com>:

> Hi, All,
>   I have one issue here about how to process multiple Kafka topics in a
> Spark 2.* program. My question is: How to get the topic name from a message
> received from Kafka? E.g:
>
> ......
>     val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](
>       ssc, kafkaParams, topicsSet)
>
>     // Get the lines, split them into words, count the words and print
>     val lines = messages.map(_._2)
>     val words = lines.flatMap(_.split(" "))
>     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>     wordCounts.print()
> ......
>
> Kafka send the messages in multiple topics through console producer for
> example. But when Spark receive the message, how it will know which topic
> is this piece of message coming from? Thanks a lot for any of your helps!
>
> Cheers,
> Dan
>



-- 
Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

Reply via email to