Hi,Alonso. Thanks! I've read about this but did not quite understand it. To pick out the topic name of a kafka message seems a simple task but the example code looks so complicated with redundent info. Why do we need offsetRanges here and do we have a easy way to achieve this?
Cheers, Dan 2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman <alons...@gmail.com>: > Hi, reading the official doc > <http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html>, > i think you can do it this way: > > import org.apache.spark.streaming.kafka._ > > val directKafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder]( > > ssc, kafkaParams, topicsSet) > > > // Hold a reference to the current offset ranges, so it can be used > downstream > var offsetRanges = Array.empty[OffsetRange] > > directKafkaStream.transform { rdd => > offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > rdd > }.map { > ... > }.foreachRDD { rdd => > for (o <- offsetRanges) { > println(*s"${o.topic}* ${o.partition} ${o.fromOffset} ${o.untilOffset}") > } > > } > > > 2017-09-06 14:38 GMT+02:00 Dan Dong <dongda...@gmail.com>: > >> Hi, All, >> I have one issue here about how to process multiple Kafka topics in a >> Spark 2.* program. My question is: How to get the topic name from a message >> received from Kafka? E.g: >> >> ...... >> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder]( >> ssc, kafkaParams, topicsSet) >> >> // Get the lines, split them into words, count the words and print >> val lines = messages.map(_._2) >> val words = lines.flatMap(_.split(" ")) >> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) >> wordCounts.print() >> ...... >> >> Kafka send the messages in multiple topics through console producer for >> example. But when Spark receive the message, how it will know which topic >> is this piece of message coming from? Thanks a lot for any of your helps! >> >> Cheers, >> Dan >> > > > > -- > Alonso Isidoro Roman > [image: https://]about.me/alonso.isidoro.roman > > <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >