Hi,Alonso.
  Thanks! I've read about this but did not quite understand it. To pick out
the topic name of a kafka message seems a simple task but the example code
looks so complicated with redundent info. Why do we need offsetRanges here
and do we have a easy way to achieve this?

Cheers,
Dan


2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman <alons...@gmail.com>:

> Hi, reading the official doc
> <http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html>,
> i think you can do it this way:
>
> import org.apache.spark.streaming.kafka._
>
>    val directKafkaStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](
>
>       ssc, kafkaParams, topicsSet)
>
>
>  // Hold a reference to the current offset ranges, so it can be used 
> downstream
>  var offsetRanges = Array.empty[OffsetRange]
>
>  directKafkaStream.transform { rdd =>
>    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>    rdd
>  }.map {
>            ...
>  }.foreachRDD { rdd =>
>    for (o <- offsetRanges) {
>      println(*s"${o.topic}* ${o.partition} ${o.fromOffset} ${o.untilOffset}")
>    }
>
>  }
>
>
> 2017-09-06 14:38 GMT+02:00 Dan Dong <dongda...@gmail.com>:
>
>> Hi, All,
>>   I have one issue here about how to process multiple Kafka topics in a
>> Spark 2.* program. My question is: How to get the topic name from a message
>> received from Kafka? E.g:
>>
>> ......
>>     val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](
>>       ssc, kafkaParams, topicsSet)
>>
>>     // Get the lines, split them into words, count the words and print
>>     val lines = messages.map(_._2)
>>     val words = lines.flatMap(_.split(" "))
>>     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>>     wordCounts.print()
>> ......
>>
>> Kafka send the messages in multiple topics through console producer for
>> example. But when Spark receive the message, how it will know which topic
>> is this piece of message coming from? Thanks a lot for any of your helps!
>>
>> Cheers,
>> Dan
>>
>
>
>
> --
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>

Reply via email to