Hi Chaitanya,

Would the below changes you proposed enough to retrieve partition & offset ?

I see emitTuple(Message msg) is being called at various places in the code… 
please advise. Thank you.


Regards,
Raja.

From: "Raja.Aravapalli" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, June 14, 2016 at 9:50 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator


Thanks for the response Chaitanya. I will follow the suggestions to retrieve 
Kafka partitionId & offset!!


Regards,
Raja.

From: Chaitanya Chebolu 
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such 
operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the 
API "void emitTuples()" and create the output port of type 
MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new 
MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the 
ability to retrieve  kafka Partition ID & Offset a particular message came 
from, along with the messages ?


Thanks a lot in advance.


Regards,
Raja.

Reply via email to