The kafka partition number is the same as the spark partition number.  The
messages for a given partition are in offset order without gaps, so you can
use the offset range to determine the offset for a given message.  Or you
can use the messageHandler argument to KafkaUtils.createDirectStream to get
access to all of the MessageAndMetadata, including partition and offset, on
a per-message basis.

On Tue, Jul 28, 2015 at 7:48 AM, Shushant Arora <>

> Hi
> I am processing kafka messages using spark streaming 1.3.
> I am using mapPartitions function to process kafka message.
>  How can I access offset no of individual message getting being processed.
> JavaPairInputDStream<byte[], byte[]> directKafkaStream
> =KafkaUtils.createDirectStream(..);
> directKafkaStream.mapPartitions(new
> FlatMapFunction<Iterator<Tuple2<byte[],byte[]>>, String>() {
> public Iterable<String> call(Iterator<Tuple2<byte[], byte[]>> t)
> throws Exception {
> while(t.hasNext()){
> Tuple2<byte[], byte[]> tuple =;
> byte[] key = tuple._1();
> byte[] msg = tuple._2();
>  ///how to get kafka partition no and offset of this message
>  }
> }
> });

