Hi Raja, You will need to update other places as well (I guess it's replay other than emitTuples) . But I think it is not feasible to replicate emitTuples code in subclass as many of the parent class variables are private. I would try to figure out if there is any other way.
Can you please confirm which Kafka version you are using? -Priyanka On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <[email protected] > wrote: > > Hi Chaitanya, > > Would the below changes you proposed enough to retrieve partition & offset > ? > > I see *emitTuple(Message msg) i*s being called at various places in the > code⦠please advise. Thank you. > > > Regards, > Raja. > > From: "Raja.Aravapalli" <[email protected]> > Date: Tuesday, June 14, 2016 at 9:50 PM > To: "[email protected]" <[email protected]> > Subject: Re: Kafka input operator > > > Thanks for the response Chaitanya. I will follow the suggestions to > retrieve Kafka partitionId & offset!! > > > Regards, > Raja. > > From: Chaitanya Chebolu <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Monday, June 13, 2016 at 3:06 AM > To: "[email protected]" <[email protected]> > Subject: Re: Kafka input operator > > Hi Raja, > > I think you are using 0.8 version of kafka operator. There is no such > operator in Malhar. To meet your requirement, please do as below: > > Create a new class which extend from AbstractKafkaInputOperator. > Override the API "void emitTuples()" and create the output port of type > MutablePair<Message,MutablePair<long,int>> > > Copy the emitTuples() from AbstractKafkaInputOperator and change the below > line: > emitTuple(message.msg) to > outputPort.emit(new MutablePair<>(message.getMsg(),new > MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId()))); > > Regards, > Chaitanya > > > On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli < > [email protected]> wrote: > >> >> Hi >> >> Does anyone have an idea, if any of the existing kafka input operators >> give the ability to retrieve kafka Partition ID & Offset a particular >> message came from, along with the messages ? >> >> >> Thanks a lot in advance. >> >> >> Regards, >> Raja. >> > >
