Hi Priyanka, Can this be done from a class outside the package “com.datatorrent.contrib.kafka;” ?
I don’t want to disturb the source :( Regards, Raja. From: "Raja.Aravapalli" <[email protected]<mailto:[email protected]>> Date: Friday, June 17, 2016 at 5:38 AM To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Subject: Re: Kafka input operator Hi Priyanka, I am using kafka version 0.8.x. Awesome. Yes. This is what is want. I shall test this and share my updates. Having one kafka operator like this in Malhar, will be a very good one. I don’t see such availability in Storm as well!! Regards, Raja. From: Priyanka Gugale <[email protected]<mailto:[email protected]>> Reply-To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Date: Friday, June 17, 2016 at 2:05 AM To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Subject: Re: Kafka input operator Hi Raja, I have quickly wrote an operator to fulfill your requirement. The code is available here<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>. Let me know if this addresses your usecase. -Priyanka On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <[email protected]<mailto:[email protected]>> wrote: Hi Raja, You will need to update other places as well (I guess it's replay other than emitTuples) . But I think it is not feasible to replicate emitTuples code in subclass as many of the parent class variables are private. I would try to figure out if there is any other way. Can you please confirm which Kafka version you are using? -Priyanka On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <[email protected]<mailto:[email protected]>> wrote: Hi Chaitanya, Would the below changes you proposed enough to retrieve partition & offset ? I see emitTuple(Message msg) is being called at various places in the code… please advise. Thank you. Regards, Raja. From: "Raja.Aravapalli" <[email protected]<mailto:[email protected]>> Date: Tuesday, June 14, 2016 at 9:50 PM To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Subject: Re: Kafka input operator Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId & offset!! Regards, Raja. From: Chaitanya Chebolu <[email protected]<mailto:[email protected]>> Reply-To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Date: Monday, June 13, 2016 at 3:06 AM To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Subject: Re: Kafka input operator Hi Raja, I think you are using 0.8 version of kafka operator. There is no such operator in Malhar. To meet your requirement, please do as below: Create a new class which extend from AbstractKafkaInputOperator. Override the API "void emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>> Copy the emitTuples() from AbstractKafkaInputOperator and change the below line: emitTuple(message.msg) to outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId()))); Regards, Chaitanya On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <[email protected]<mailto:[email protected]>> wrote: Hi Does anyone have an idea, if any of the existing kafka input operators give the ability to retrieve kafka Partition ID & Offset a particular message came from, along with the messages ? Thanks a lot in advance. Regards, Raja.
