kafka.message.Message is the problem, MutablePair has a no-arg constructor and should be serializable for Kryo,
On Sun, Jun 19, 2016 at 3:10 PM, <hsy...@gmail.com> wrote: > The Pairs in Apache common are not Kryo serializable. You can use other > pair data structure. For example KeyValuePair in Malhar library > > Siyuan > > Sent from my iPhone > > On Jun 19, 2016, at 14:58, Raja.Aravapalli <raja.aravapa...@target.com> > wrote: > > > Hi Priyanka, > > I am writing to read the messages in the next operator with input port > defined like the below, > > public transient DefaultInputPort<MutablePair<Message, MutablePair<Long, > Integer>>> input = new DefaultInputPort<MutablePair<Message, > MutablePair<Long, Integer>>>() > > > Application is failing with below exception: > > 2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec > (DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic Error: > Execution halted due to Kryo exception! > com.esotericsoftware.kryo.KryoException: Class cannot be created (missing > no-arg constructor): kafka.message.Message > Serialization trace: > left (org.apache.commons.lang3.tuple.MutablePair) > at > com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) > > > > Any help please. > > Regards, > Raja. > > From: "Raja.Aravapalli" <raja.aravapa...@target.com> > Reply-To: "users@apex.apache.org" <users@apex.apache.org> > Date: Sunday, June 19, 2016 at 12:22 AM > To: "users@apex.apache.org" <users@apex.apache.org> > Subject: Re: Kafka input operator > > > Thanks for the response Priyanka… > > But, when I try to put in my own package, some of the protected variables > are not accessible!!!! > > > Regards, > Raja. > > From: Priyanka Gugale <priya...@datatorrent.com> > Reply-To: "users@apex.apache.org" <users@apex.apache.org> > Date: Saturday, June 18, 2016 at 10:29 AM > To: "users@apex.apache.org" <users@apex.apache.org> > Subject: Re: Kafka input operator > > Hi, > > Yes sure, you can use any package name you want. In fact better you put > this class outside Malhar jar. Just keep the Malhar jar in your class path. > > -Priyanka > On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <raja.aravapa...@target.com> > wrote: > >> >> Hi Priyanka, >> >> Can this be done from a class outside the package “ >> com.datatorrent.contrib.kafka;” ? >> >> I don’t want to disturb the source :( >> >> >> >> Regards, >> Raja. >> >> From: "Raja.Aravapalli" <raja.aravapa...@target.com> >> Date: Friday, June 17, 2016 at 5:38 AM >> To: "users@apex.apache.org" <users@apex.apache.org> >> Subject: Re: Kafka input operator >> >> >> Hi Priyanka, >> >> I am using kafka version 0.8.x. >> >> Awesome. Yes. This is what is want. I shall test this and share my >> updates. Having one kafka operator like this in Malhar, will be a very good >> one. I don’t see such availability in Storm as well!! >> >> >> >> Regards, >> Raja. >> >> From: Priyanka Gugale <priya...@datatorrent.com> >> Reply-To: "users@apex.apache.org" <users@apex.apache.org> >> Date: Friday, June 17, 2016 at 2:05 AM >> To: "users@apex.apache.org" <users@apex.apache.org> >> Subject: Re: Kafka input operator >> >> Hi Raja, >> >> I have quickly wrote an operator to fulfill your requirement. The code is >> available here >> <https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>. >> Let me know if this addresses your usecase. >> >> -Priyanka >> >> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale < >> priya...@datatorrent.com> wrote: >> >>> Hi Raja, >>> >>> You will need to update other places as well (I guess it's replay other >>> than emitTuples) . But I think it is not feasible to replicate emitTuples >>> code in subclass as many of the parent class variables are private. I would >>> try to figure out if there is any other way. >>> >>> Can you please confirm which Kafka version you are using? >>> >>> -Priyanka >>> >>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli < >>> raja.aravapa...@target.com> wrote: >>> >>>> >>>> Hi Chaitanya, >>>> >>>> Would the below changes you proposed enough to retrieve partition & >>>> offset ? >>>> >>>> I see *emitTuple(Message msg) i*s being called at various places in >>>> the code… please advise. Thank you. >>>> >>>> >>>> Regards, >>>> Raja. >>>> >>>> From: "Raja.Aravapalli" <raja.aravapa...@target.com> >>>> Date: Tuesday, June 14, 2016 at 9:50 PM >>>> To: "users@apex.apache.org" <users@apex.apache.org> >>>> Subject: Re: Kafka input operator >>>> >>>> >>>> Thanks for the response Chaitanya. I will follow the suggestions to >>>> retrieve Kafka partitionId & offset!! >>>> >>>> >>>> Regards, >>>> Raja. >>>> >>>> From: Chaitanya Chebolu <chaita...@datatorrent.com> >>>> Reply-To: "users@apex.apache.org" <users@apex.apache.org> >>>> Date: Monday, June 13, 2016 at 3:06 AM >>>> To: "users@apex.apache.org" <users@apex.apache.org> >>>> Subject: Re: Kafka input operator >>>> >>>> Hi Raja, >>>> >>>> I think you are using 0.8 version of kafka operator. There is no >>>> such operator in Malhar. To meet your requirement, please do as below: >>>> >>>> Create a new class which extend from AbstractKafkaInputOperator. >>>> Override the API "void emitTuples()" and create the output port of type >>>> MutablePair<Message,MutablePair<long,int>> >>>> >>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the >>>> below line: >>>> emitTuple(message.msg) to >>>> outputPort.emit(new MutablePair<>(message.getMsg(),new >>>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId()))); >>>> >>>> Regards, >>>> Chaitanya >>>> >>>> >>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli < >>>> raja.aravapa...@target.com> wrote: >>>> >>>>> >>>>> Hi >>>>> >>>>> Does anyone have an idea, if any of the existing kafka input operators >>>>> give the ability to retrieve kafka Partition ID & Offset a particular >>>>> message came from, along with the messages ? >>>>> >>>>> >>>>> Thanks a lot in advance. >>>>> >>>>> >>>>> Regards, >>>>> Raja. >>>>> >>>> >>>> >>> >>