There's 1 topic per partition, so you're probably better off dealing
with topics that way rather than at the individual message level.

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

Look at the discussion of "HasOffsetRanges"

If you really want to attach a topic to each message, look at the
constructor that allows you to pass a messageHandler argument.  That
gives you per-item access to everything in message and metadata,
including the topic.

On Wed, Mar 16, 2016 at 3:37 AM, Imre Nagi <imre.nagi2...@gmail.com> wrote:
> Hi,
>
> I'm just trying to process the data that come from the kafka source in my
> spark streaming application. What I want to do is get the pair of topic and
> message in a tuple from the message stream.
>
> Here is my streams:
>
>>  val streams = KafkaUtils.createDirectStream[String, Array[Byte],
>> StringDecoder, DefaultDecoder](ssc,kafkaParameter,
>>       Array["topic1", "topic2])
>
>
> I have done several things, but still failed when i did some transformations
> from the streams to the pair of topic and message. I hope somebody can help
> me here.
>
> Thanks,
> Imre

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to