Hi Priyanka,

Can this be done from a class outside the package 
“com.datatorrent.contrib.kafka;” ?

I don’t want to disturb the source :(



Regards,
Raja.

From: "Raja.Aravapalli" 
<[email protected]<mailto:[email protected]>>
Date: Friday, June 17, 2016 at 5:38 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator


Hi Priyanka,

I am using kafka version 0.8.x.

Awesome. Yes. This is what is want. I shall test this and share my updates. 
Having one kafka operator like this in Malhar, will be a very good one. I don’t 
see such availability in Storm as well!!



Regards,
Raja.

From: Priyanka Gugale 
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Friday, June 17, 2016 at 2:05 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator

Hi Raja,

I have quickly wrote an operator to fulfill your requirement. The code is 
available 
here<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
 Let me know if this addresses your usecase.

-Priyanka

On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale 
<[email protected]<mailto:[email protected]>> wrote:
Hi Raja,

You will need to update other places as well (I guess it's replay other than 
emitTuples) . But I think it is not feasible to replicate emitTuples code in 
subclass as many of the parent class variables are private. I would try to 
figure out if there is any other way.

Can you please confirm which Kafka version you are using?

-Priyanka

On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

Hi Chaitanya,

Would the below changes you proposed enough to retrieve partition & offset ?

I see emitTuple(Message msg) is being called at various places in the code… 
please advise. Thank you.


Regards,
Raja.

From: "Raja.Aravapalli" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, June 14, 2016 at 9:50 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator


Thanks for the response Chaitanya. I will follow the suggestions to retrieve 
Kafka partitionId & offset!!


Regards,
Raja.

From: Chaitanya Chebolu 
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such 
operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the 
API "void emitTuples()" and create the output port of type 
MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new 
MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the 
ability to retrieve  kafka Partition ID & Offset a particular message came 
from, along with the messages ?


Thanks a lot in advance.


Regards,
Raja.



Reply via email to