Hi Priyanka,
I am writing to read the messages in the next operator with input port defined
like the below,
public transient DefaultInputPort<MutablePair<Message, MutablePair<Long,
Integer>>> input = new DefaultInputPort<MutablePair<Message, MutablePair<Long,
Integer>>>()
Application is failing with below exception:
2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec
(DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic Error:
Execution halted due to Kryo exception!
com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
no-arg constructor): kafka.message.Message
Serialization trace:
left (org.apache.commons.lang3.tuple.MutablePair)
at
com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
Any help please.
Regards,
Raja.
From: "Raja.Aravapalli"
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Sunday, June 19, 2016 at 12:22 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator
Thanks for the response Priyanka…
But, when I try to put in my own package, some of the protected variables are
not accessible!!!!
Regards,
Raja.
From: Priyanka Gugale
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Saturday, June 18, 2016 at 10:29 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator
Hi,
Yes sure, you can use any package name you want. In fact better you put this
class outside Malhar jar. Just keep the Malhar jar in your class path.
-Priyanka
On Jun 17, 2016 8:03 PM, "Raja.Aravapalli"
<[email protected]<mailto:[email protected]>> wrote:
Hi Priyanka,
Can this be done from a class outside the package
“com.datatorrent.contrib.kafka;” ?
I don’t want to disturb the source :(
Regards,
Raja.
From: "Raja.Aravapalli"
<[email protected]<mailto:[email protected]>>
Date: Friday, June 17, 2016 at 5:38 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator
Hi Priyanka,
I am using kafka version 0.8.x.
Awesome. Yes. This is what is want. I shall test this and share my updates.
Having one kafka operator like this in Malhar, will be a very good one. I don’t
see such availability in Storm as well!!
Regards,
Raja.
From: Priyanka Gugale
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Friday, June 17, 2016 at 2:05 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator
Hi Raja,
I have quickly wrote an operator to fulfill your requirement. The code is
available
here<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
Let me know if this addresses your usecase.
-Priyanka
On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale
<[email protected]<mailto:[email protected]>> wrote:
Hi Raja,
You will need to update other places as well (I guess it's replay other than
emitTuples) . But I think it is not feasible to replicate emitTuples code in
subclass as many of the parent class variables are private. I would try to
figure out if there is any other way.
Can you please confirm which Kafka version you are using?
-Priyanka
On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli
<[email protected]<mailto:[email protected]>> wrote:
Hi Chaitanya,
Would the below changes you proposed enough to retrieve partition & offset ?
I see emitTuple(Message msg) is being called at various places in the code…
please advise. Thank you.
Regards,
Raja.
From: "Raja.Aravapalli"
<[email protected]<mailto:[email protected]>>
Date: Tuesday, June 14, 2016 at 9:50 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator
Thanks for the response Chaitanya. I will follow the suggestions to retrieve
Kafka partitionId & offset!!
Regards,
Raja.
From: Chaitanya Chebolu
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Kafka input operator
Hi Raja,
I think you are using 0.8 version of kafka operator. There is no such
operator in Malhar. To meet your requirement, please do as below:
Create a new class which extend from AbstractKafkaInputOperator. Override the
API "void emitTuples()" and create the output port of type
MutablePair<Message,MutablePair<long,int>>
Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new
MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
Regards,
Chaitanya
On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli
<[email protected]<mailto:[email protected]>> wrote:
Hi
Does anyone have an idea, if any of the existing kafka input operators give the
ability to retrieve kafka Partition ID & Offset a particular message came
from, along with the messages ?
Thanks a lot in advance.
Regards,
Raja.