Hi Raja, I have quickly wrote an operator to fulfill your requirement. The code is available here <https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>. Let me know if this addresses your usecase.
-Priyanka On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <[email protected]> wrote: > Hi Raja, > > You will need to update other places as well (I guess it's replay other > than emitTuples) . But I think it is not feasible to replicate emitTuples > code in subclass as many of the parent class variables are private. I would > try to figure out if there is any other way. > > Can you please confirm which Kafka version you are using? > > -Priyanka > > On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli < > [email protected]> wrote: > >> >> Hi Chaitanya, >> >> Would the below changes you proposed enough to retrieve partition & >> offset ? >> >> I see *emitTuple(Message msg) i*s being called at various places in the >> code⦠please advise. Thank you. >> >> >> Regards, >> Raja. >> >> From: "Raja.Aravapalli" <[email protected]> >> Date: Tuesday, June 14, 2016 at 9:50 PM >> To: "[email protected]" <[email protected]> >> Subject: Re: Kafka input operator >> >> >> Thanks for the response Chaitanya. I will follow the suggestions to >> retrieve Kafka partitionId & offset!! >> >> >> Regards, >> Raja. >> >> From: Chaitanya Chebolu <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Monday, June 13, 2016 at 3:06 AM >> To: "[email protected]" <[email protected]> >> Subject: Re: Kafka input operator >> >> Hi Raja, >> >> I think you are using 0.8 version of kafka operator. There is no such >> operator in Malhar. To meet your requirement, please do as below: >> >> Create a new class which extend from AbstractKafkaInputOperator. >> Override the API "void emitTuples()" and create the output port of type >> MutablePair<Message,MutablePair<long,int>> >> >> Copy the emitTuples() from AbstractKafkaInputOperator and change the >> below line: >> emitTuple(message.msg) to >> outputPort.emit(new MutablePair<>(message.getMsg(),new >> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId()))); >> >> Regards, >> Chaitanya >> >> >> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli < >> [email protected]> wrote: >> >>> >>> Hi >>> >>> Does anyone have an idea, if any of the existing kafka input operators >>> give the ability to retrieve kafka Partition ID & Offset a particular >>> message came from, along with the messages ? >>> >>> >>> Thanks a lot in advance. >>> >>> >>> Regards, >>> Raja. >>> >> >> >
