You don't have to use some other package in order to get access to the offsets.
Shushant, have you read the available documentation at http://spark.apache.org/docs/latest/streaming-kafka-integration.html https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md or watched https://www.youtube.com/watch?v=fXnNEq1v3VA The kafka partition number is the same as the spark partition number. The messages for a given partition are in offset order without gaps, so you can use the offset range to determine the offset for a given message. Or you can use the messageHandler argument to KafkaUtils.createDirectStream to get access to all of the MessageAndMetadata, including partition and offset, on a per-message basis. On Tue, Jul 28, 2015 at 7:48 AM, Shushant Arora <shushantaror...@gmail.com> wrote: > Hi > > I am processing kafka messages using spark streaming 1.3. > > I am using mapPartitions function to process kafka message. > How can I access offset no of individual message getting being processed. > > > JavaPairInputDStream<byte[], byte[]> directKafkaStream > =KafkaUtils.createDirectStream(..); > > directKafkaStream.mapPartitions(new > FlatMapFunction<Iterator<Tuple2<byte[],byte[]>>, String>() { > public Iterable<String> call(Iterator<Tuple2<byte[], byte[]>> t) > throws Exception { > > while(t.hasNext()){ > Tuple2<byte[], byte[]> tuple = t.next(); > byte[] key = tuple._1(); > byte[] msg = tuple._2(); > ///how to get kafka partition no and offset of this message > } > } > }); > > > > >