The Pairs in Apache common are not Kryo serializable. You can use other pair 
data structure. For example KeyValuePair in Malhar library 

Siyuan

Sent from my iPhone

> On Jun 19, 2016, at 14:58, Raja.Aravapalli <[email protected]> wrote:
> 
> 
> Hi Priyanka, 
> 
> I am writing to read the messages in the next operator with input port 
> defined like the below, 
> 
> public transient DefaultInputPort<MutablePair<Message, MutablePair<Long, 
> Integer>>> input = new DefaultInputPort<MutablePair<Message, 
> MutablePair<Long, Integer>>>()
> 
> Application is failing with below exception:
> 
> 2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec 
> (DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic Error: 
> Execution halted due to Kryo exception!
> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing 
> no-arg constructor): kafka.message.Message
> Serialization trace:
> left (org.apache.commons.lang3.tuple.MutablePair)
>       at 
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>       at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>       at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> 
> 
> Any help please. 
> 
> Regards, 
> Raja.
> 
> From: "Raja.Aravapalli" <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Sunday, June 19, 2016 at 12:22 AM
> To: "[email protected]" <[email protected]>
> Subject: Re: Kafka input operator
> 
> 
> Thanks for the response Priyanka…
> 
> But, when I try to put in my own package, some of the protected variables are 
> not accessible!!!! 
> 
> 
> Regards,
> Raja.
> 
> From: Priyanka Gugale <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Saturday, June 18, 2016 at 10:29 AM
> To: "[email protected]" <[email protected]>
> Subject: Re: Kafka input operator
> 
> Hi,
> 
> Yes sure, you can use any package name you want. In fact better you put this 
> class outside Malhar jar. Just keep the Malhar jar in your class path.
> 
> -Priyanka
> 
>> On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <[email protected]> 
>> wrote:
>> 
>> Hi Priyanka, 
>> 
>> Can this be done from a class outside the package 
>> “com.datatorrent.contrib.kafka;” ?
>> 
>> I don’t want to disturb the source :(
>> 
>> 
>> 
>> Regards, 
>> Raja.
>> 
>> From: "Raja.Aravapalli" <[email protected]>
>> Date: Friday, June 17, 2016 at 5:38 AM
>> To: "[email protected]" <[email protected]>
>> Subject: Re: Kafka input operator
>> 
>> 
>> Hi Priyanka, 
>> 
>> I am using kafka version 0.8.x.
>> 
>> Awesome. Yes. This is what is want. I shall test this and share my updates. 
>> Having one kafka operator like this in Malhar, will be a very good one. I 
>> don’t see such availability in Storm as well!!
>> 
>> 
>> 
>> Regards, 
>> Raja.
>> 
>> From: Priyanka Gugale <[email protected]>
>> Reply-To: "[email protected]" <[email protected]>
>> Date: Friday, June 17, 2016 at 2:05 AM
>> To: "[email protected]" <[email protected]>
>> Subject: Re: Kafka input operator
>> 
>> Hi Raja,
>> 
>> I have quickly wrote an operator to fulfill your requirement. The code is 
>> available here. Let me know if this addresses your usecase.
>> 
>> -Priyanka
>> 
>>> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale 
>>> <[email protected]> wrote:
>>> Hi Raja,
>>> 
>>> You will need to update other places as well (I guess it's replay other 
>>> than emitTuples) . But I think it is not feasible to replicate emitTuples 
>>> code in subclass as many of the parent class variables are private. I would 
>>> try to figure out if there is any other way.
>>> 
>>> Can you please confirm which Kafka version you are using?
>>> 
>>> -Priyanka
>>> 
>>>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli 
>>>> <[email protected]> wrote:
>>>> 
>>>> Hi Chaitanya, 
>>>> 
>>>> Would the below changes you proposed enough to retrieve partition & offset 
>>>> ?
>>>> 
>>>> I see emitTuple(Message msg) is being called at various places in the 
>>>> code… please advise. Thank you.
>>>> 
>>>> 
>>>> Regards,
>>>> Raja.
>>>> 
>>>> From: "Raja.Aravapalli" <[email protected]>
>>>> Date: Tuesday, June 14, 2016 at 9:50 PM
>>>> To: "[email protected]" <[email protected]>
>>>> Subject: Re: Kafka input operator
>>>> 
>>>> 
>>>> Thanks for the response Chaitanya. I will follow the suggestions to 
>>>> retrieve Kafka partitionId & offset!! 
>>>> 
>>>> 
>>>> Regards,
>>>> Raja.
>>>> 
>>>> From: Chaitanya Chebolu <[email protected]>
>>>> Reply-To: "[email protected]" <[email protected]>
>>>> Date: Monday, June 13, 2016 at 3:06 AM
>>>> To: "[email protected]" <[email protected]>
>>>> Subject: Re: Kafka input operator
>>>> 
>>>> Hi Raja,
>>>> 
>>>>    I think you are using 0.8 version of kafka operator. There is no such 
>>>> operator in Malhar.  To meet your requirement, please do as below:
>>>> 
>>>>   Create a new class which extend from AbstractKafkaInputOperator. 
>>>> Override the API "void emitTuples()" and create the output port of type 
>>>> MutablePair<Message,MutablePair<long,int>>
>>>> 
>>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the below 
>>>> line:
>>>> emitTuple(message.msg) to
>>>> outputPort.emit(new MutablePair<>(message.getMsg(),new 
>>>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>>>> 
>>>> Regards,
>>>> Chaitanya
>>>> 
>>>> 
>>>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli 
>>>>> <[email protected]> wrote:
>>>>> 
>>>>> Hi
>>>>> 
>>>>> Does anyone have an idea, if any of the existing kafka input operators 
>>>>> give the ability to retrieve  kafka Partition ID & Offset a particular 
>>>>> message came from, along with the messages ?
>>>>> 
>>>>> 
>>>>> Thanks a lot in advance. 
>>>>> 
>>>>> 
>>>>> Regards,
>>>>> Raja.
>>>> 
>>> 
>> 

Reply via email to