Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId & offset!!
Regards, Raja. From: Chaitanya Chebolu <chaita...@datatorrent.com<mailto:chaita...@datatorrent.com>> Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>> Date: Monday, June 13, 2016 at 3:06 AM To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>> Subject: Re: Kafka input operator Hi Raja, I think you are using 0.8 version of kafka operator. There is no such operator in Malhar. To meet your requirement, please do as below: Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>> Copy the emitTuples() from AbstractKafkaInputOperator and change the below line: emitTuple(message.msg) to outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId()))); Regards, Chaitanya On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <raja.aravapa...@target.com<mailto:raja.aravapa...@target.com>> wrote: Hi Does anyone have an idea, if any of the existing kafka input operators give the ability to retrieve kafka Partition ID & Offset a particular message came from, along with the messages ? Thanks a lot in advance. Regards, Raja.