Hi, reading the official doc <http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html>, i think you can do it this way:
import org.apache.spark.streaming.kafka._ val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) // Hold a reference to the current offset ranges, so it can be used downstream var offsetRanges = Array.empty[OffsetRange] directKafkaStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.map { ... }.foreachRDD { rdd => for (o <- offsetRanges) { println(*s"${o.topic}* ${o.partition} ${o.fromOffset} ${o.untilOffset}") } } 2017-09-06 14:38 GMT+02:00 Dan Dong <dongda...@gmail.com>: > Hi, All, > I have one issue here about how to process multiple Kafka topics in a > Spark 2.* program. My question is: How to get the topic name from a message > received from Kafka? E.g: > > ...... > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder]( > ssc, kafkaParams, topicsSet) > > // Get the lines, split them into words, count the words and print > val lines = messages.map(_._2) > val words = lines.flatMap(_.split(" ")) > val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) > wordCounts.print() > ...... > > Kafka send the messages in multiple topics through console producer for > example. But when Spark receive the message, how it will know which topic > is this piece of message coming from? Thanks a lot for any of your helps! > > Cheers, > Dan > -- Alonso Isidoro Roman [image: https://]about.me/alonso.isidoro.roman <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>