Hi Raja, I think you are using 0.8 version of kafka operator. There is no such operator in Malhar. To meet your requirement, please do as below:
Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>> Copy the emitTuples() from AbstractKafkaInputOperator and change the below line: emitTuple(message.msg) to outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId()))); Regards, Chaitanya On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <[email protected] > wrote: > > Hi > > Does anyone have an idea, if any of the existing kafka input operators > give the ability to retrieve kafka Partition ID & Offset a particular > message came from, along with the messages ? > > > Thanks a lot in advance. > > > Regards, > Raja. >
