Hi Raja, I have opened a pull request <https://github.com/apache/apex-malhar/pull/323/files> in Malhar to add emitTuple method to AbstractKafkaInputOperator which has KafkaMessage parameter. The KafkaMessage has both partitionId and offset along with the message. Once this gets merged, you can override* only* emitTuple method in your subclass and convert "Message" into String or bytes array. So now you won't have variable access problem as well as serialization issues.
-Priyanka On Mon, Jun 20, 2016 at 3:59 AM, Thomas Weise <[email protected]> wrote: > kafka.message.Message is the problem, MutablePair has a no-arg constructor > and should be serializable for Kryo, > > > On Sun, Jun 19, 2016 at 3:10 PM, <[email protected]> wrote: > >> The Pairs in Apache common are not Kryo serializable. You can use other >> pair data structure. For example KeyValuePair in Malhar library >> >> Siyuan >> >> Sent from my iPhone >> >> On Jun 19, 2016, at 14:58, Raja.Aravapalli <[email protected]> >> wrote: >> >> >> Hi Priyanka, >> >> I am writing to read the messages in the next operator with input port >> defined like the below, >> >> public transient DefaultInputPort<MutablePair<Message, MutablePair<Long, >> Integer>>> input = new DefaultInputPort<MutablePair<Message, >> MutablePair<Long, Integer>>>() >> >> >> Application is failing with below exception: >> >> 2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec >> (DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic >> Error: Execution halted due to Kryo exception! >> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing >> no-arg constructor): kafka.message.Message >> Serialization trace: >> left (org.apache.commons.lang3.tuple.MutablePair) >> at >> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) >> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) >> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) >> >> >> >> Any help please. >> >> Regards, >> Raja. >> >> From: "Raja.Aravapalli" <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Sunday, June 19, 2016 at 12:22 AM >> To: "[email protected]" <[email protected]> >> Subject: Re: Kafka input operator >> >> >> Thanks for the response Priyanka… >> >> But, when I try to put in my own package, some of the protected variables >> are not accessible!!!! >> >> >> Regards, >> Raja. >> >> From: Priyanka Gugale <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Saturday, June 18, 2016 at 10:29 AM >> To: "[email protected]" <[email protected]> >> Subject: Re: Kafka input operator >> >> Hi, >> >> Yes sure, you can use any package name you want. In fact better you put >> this class outside Malhar jar. Just keep the Malhar jar in your class path. >> >> -Priyanka >> On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <[email protected]> >> wrote: >> >>> >>> Hi Priyanka, >>> >>> Can this be done from a class outside the package “ >>> com.datatorrent.contrib.kafka;” ? >>> >>> I don’t want to disturb the source :( >>> >>> >>> >>> Regards, >>> Raja. >>> >>> From: "Raja.Aravapalli" <[email protected]> >>> Date: Friday, June 17, 2016 at 5:38 AM >>> To: "[email protected]" <[email protected]> >>> Subject: Re: Kafka input operator >>> >>> >>> Hi Priyanka, >>> >>> I am using kafka version 0.8.x. >>> >>> Awesome. Yes. This is what is want. I shall test this and share my >>> updates. Having one kafka operator like this in Malhar, will be a very good >>> one. I don’t see such availability in Storm as well!! >>> >>> >>> >>> Regards, >>> Raja. >>> >>> From: Priyanka Gugale <[email protected]> >>> Reply-To: "[email protected]" <[email protected]> >>> Date: Friday, June 17, 2016 at 2:05 AM >>> To: "[email protected]" <[email protected]> >>> Subject: Re: Kafka input operator >>> >>> Hi Raja, >>> >>> I have quickly wrote an operator to fulfill your requirement. The code >>> is available here >>> <https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>. >>> Let me know if this addresses your usecase. >>> >>> -Priyanka >>> >>> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale < >>> [email protected]> wrote: >>> >>>> Hi Raja, >>>> >>>> You will need to update other places as well (I guess it's replay other >>>> than emitTuples) . But I think it is not feasible to replicate emitTuples >>>> code in subclass as many of the parent class variables are private. I would >>>> try to figure out if there is any other way. >>>> >>>> Can you please confirm which Kafka version you are using? >>>> >>>> -Priyanka >>>> >>>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli < >>>> [email protected]> wrote: >>>> >>>>> >>>>> Hi Chaitanya, >>>>> >>>>> Would the below changes you proposed enough to retrieve partition & >>>>> offset ? >>>>> >>>>> I see *emitTuple(Message msg) i*s being called at various places in >>>>> the code… please advise. Thank you. >>>>> >>>>> >>>>> Regards, >>>>> Raja. >>>>> >>>>> From: "Raja.Aravapalli" <[email protected]> >>>>> Date: Tuesday, June 14, 2016 at 9:50 PM >>>>> To: "[email protected]" <[email protected]> >>>>> Subject: Re: Kafka input operator >>>>> >>>>> >>>>> Thanks for the response Chaitanya. I will follow the suggestions to >>>>> retrieve Kafka partitionId & offset!! >>>>> >>>>> >>>>> Regards, >>>>> Raja. >>>>> >>>>> From: Chaitanya Chebolu <[email protected]> >>>>> Reply-To: "[email protected]" <[email protected]> >>>>> Date: Monday, June 13, 2016 at 3:06 AM >>>>> To: "[email protected]" <[email protected]> >>>>> Subject: Re: Kafka input operator >>>>> >>>>> Hi Raja, >>>>> >>>>> I think you are using 0.8 version of kafka operator. There is no >>>>> such operator in Malhar. To meet your requirement, please do as below: >>>>> >>>>> Create a new class which extend from AbstractKafkaInputOperator. >>>>> Override the API "void emitTuples()" and create the output port of type >>>>> MutablePair<Message,MutablePair<long,int>> >>>>> >>>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the >>>>> below line: >>>>> emitTuple(message.msg) to >>>>> outputPort.emit(new MutablePair<>(message.getMsg(),new >>>>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId()))); >>>>> >>>>> Regards, >>>>> Chaitanya >>>>> >>>>> >>>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> Hi >>>>>> >>>>>> Does anyone have an idea, if any of the existing kafka input >>>>>> operators give the ability to retrieve kafka Partition ID & Offset a >>>>>> particular message came from, along with the messages ? >>>>>> >>>>>> >>>>>> Thanks a lot in advance. >>>>>> >>>>>> >>>>>> Regards, >>>>>> Raja. >>>>>> >>>>> >>>>> >>>> >>> >
