There's 1 topic per partition, so you're probably better off dealing with topics that way rather than at the individual message level.
http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers Look at the discussion of "HasOffsetRanges" If you really want to attach a topic to each message, look at the constructor that allows you to pass a messageHandler argument. That gives you per-item access to everything in message and metadata, including the topic. On Wed, Mar 16, 2016 at 3:37 AM, Imre Nagi <imre.nagi2...@gmail.com> wrote: > Hi, > > I'm just trying to process the data that come from the kafka source in my > spark streaming application. What I want to do is get the pair of topic and > message in a tuple from the message stream. > > Here is my streams: > >> val streams = KafkaUtils.createDirectStream[String, Array[Byte], >> StringDecoder, DefaultDecoder](ssc,kafkaParameter, >> Array["topic1", "topic2]) > > > I have done several things, but still failed when i did some transformations > from the streams to the pair of topic and message. I hope somebody can help > me here. > > Thanks, > Imre --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org