The Pairs in Apache common are not Kryo serializable. You can use other pair data structure. For example KeyValuePair in Malhar library
Siyuan Sent from my iPhone > On Jun 19, 2016, at 14:58, Raja.Aravapalli <[email protected]> wrote: > > > Hi Priyanka, > > I am writing to read the messages in the next operator with input port > defined like the below, > > public transient DefaultInputPort<MutablePair<Message, MutablePair<Long, > Integer>>> input = new DefaultInputPort<MutablePair<Message, > MutablePair<Long, Integer>>>() > > Application is failing with below exception: > > 2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec > (DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic Error: > Execution halted due to Kryo exception! > com.esotericsoftware.kryo.KryoException: Class cannot be created (missing > no-arg constructor): kafka.message.Message > Serialization trace: > left (org.apache.commons.lang3.tuple.MutablePair) > at > com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) > > > Any help please. > > Regards, > Raja. > > From: "Raja.Aravapalli" <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Sunday, June 19, 2016 at 12:22 AM > To: "[email protected]" <[email protected]> > Subject: Re: Kafka input operator > > > Thanks for the response Priyanka… > > But, when I try to put in my own package, some of the protected variables are > not accessible!!!! > > > Regards, > Raja. > > From: Priyanka Gugale <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Saturday, June 18, 2016 at 10:29 AM > To: "[email protected]" <[email protected]> > Subject: Re: Kafka input operator > > Hi, > > Yes sure, you can use any package name you want. In fact better you put this > class outside Malhar jar. Just keep the Malhar jar in your class path. > > -Priyanka > >> On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <[email protected]> >> wrote: >> >> Hi Priyanka, >> >> Can this be done from a class outside the package >> “com.datatorrent.contrib.kafka;” ? >> >> I don’t want to disturb the source :( >> >> >> >> Regards, >> Raja. >> >> From: "Raja.Aravapalli" <[email protected]> >> Date: Friday, June 17, 2016 at 5:38 AM >> To: "[email protected]" <[email protected]> >> Subject: Re: Kafka input operator >> >> >> Hi Priyanka, >> >> I am using kafka version 0.8.x. >> >> Awesome. Yes. This is what is want. I shall test this and share my updates. >> Having one kafka operator like this in Malhar, will be a very good one. I >> don’t see such availability in Storm as well!! >> >> >> >> Regards, >> Raja. >> >> From: Priyanka Gugale <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Friday, June 17, 2016 at 2:05 AM >> To: "[email protected]" <[email protected]> >> Subject: Re: Kafka input operator >> >> Hi Raja, >> >> I have quickly wrote an operator to fulfill your requirement. The code is >> available here. Let me know if this addresses your usecase. >> >> -Priyanka >> >>> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale >>> <[email protected]> wrote: >>> Hi Raja, >>> >>> You will need to update other places as well (I guess it's replay other >>> than emitTuples) . But I think it is not feasible to replicate emitTuples >>> code in subclass as many of the parent class variables are private. I would >>> try to figure out if there is any other way. >>> >>> Can you please confirm which Kafka version you are using? >>> >>> -Priyanka >>> >>>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli >>>> <[email protected]> wrote: >>>> >>>> Hi Chaitanya, >>>> >>>> Would the below changes you proposed enough to retrieve partition & offset >>>> ? >>>> >>>> I see emitTuple(Message msg) is being called at various places in the >>>> code… please advise. Thank you. >>>> >>>> >>>> Regards, >>>> Raja. >>>> >>>> From: "Raja.Aravapalli" <[email protected]> >>>> Date: Tuesday, June 14, 2016 at 9:50 PM >>>> To: "[email protected]" <[email protected]> >>>> Subject: Re: Kafka input operator >>>> >>>> >>>> Thanks for the response Chaitanya. I will follow the suggestions to >>>> retrieve Kafka partitionId & offset!! >>>> >>>> >>>> Regards, >>>> Raja. >>>> >>>> From: Chaitanya Chebolu <[email protected]> >>>> Reply-To: "[email protected]" <[email protected]> >>>> Date: Monday, June 13, 2016 at 3:06 AM >>>> To: "[email protected]" <[email protected]> >>>> Subject: Re: Kafka input operator >>>> >>>> Hi Raja, >>>> >>>> I think you are using 0.8 version of kafka operator. There is no such >>>> operator in Malhar. To meet your requirement, please do as below: >>>> >>>> Create a new class which extend from AbstractKafkaInputOperator. >>>> Override the API "void emitTuples()" and create the output port of type >>>> MutablePair<Message,MutablePair<long,int>> >>>> >>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the below >>>> line: >>>> emitTuple(message.msg) to >>>> outputPort.emit(new MutablePair<>(message.getMsg(),new >>>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId()))); >>>> >>>> Regards, >>>> Chaitanya >>>> >>>> >>>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli >>>>> <[email protected]> wrote: >>>>> >>>>> Hi >>>>> >>>>> Does anyone have an idea, if any of the existing kafka input operators >>>>> give the ability to retrieve kafka Partition ID & Offset a particular >>>>> message came from, along with the messages ? >>>>> >>>>> >>>>> Thanks a lot in advance. >>>>> >>>>> >>>>> Regards, >>>>> Raja. >>>> >>> >>
