Thanks for the response Chaitanya. I will follow the suggestions to retrieve 
Kafka partitionId & offset!!


Regards,
Raja.

From: Chaitanya Chebolu 
<chaita...@datatorrent.com<mailto:chaita...@datatorrent.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" 
<users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" 
<users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such 
operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the 
API "void emitTuples()" and create the output port of type 
MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new 
MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli 
<raja.aravapa...@target.com<mailto:raja.aravapa...@target.com>> wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the 
ability to retrieve  kafka Partition ID & Offset a particular message came 
from, along with the messages ?


Thanks a lot in advance.


Regards,
Raja.

Reply via email to