Re: kafka offset commit

2016-06-06 Thread Devendra Tagare
Hi, I had started work on an offset manager for kafka 0.8x sometime back which got left mid-way.This implementation was using kafka topics to store offsets (similar to 0.9 implementation) https://github.com/apache/apex-malhar/pull/156 If the community is using it, I can incorporate the comments

Re: kafka offset commit

2016-06-06 Thread hsy...@gmail.com
Raja, Not exactly, Apex actually stores offsets as part of the operator state, And state of the operator are checkpointed internally and periodically( in HDFS by default). For more details, you can read this https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/ With that said,

Re: kafka offset commit

2016-06-06 Thread Raja . Aravapalli
Thanks Siyuan. So, to confirm, to apex is not storing offsets status at any location ? Like how Storm stores in Zookeeper ? Regards, Raja. From: "hsy...@gmail.com" > Reply-To:

Re: kafka offset commit

2016-06-06 Thread Raja . Aravapalli
Hi Thomas, We are using 0.8 cluster still!! Regards, Raja. From: Thomas Weise > Reply-To: "users@apex.apache.org" > Date: Monday, June 6, 2016 at 5:23 PM

Re: kafka offset commit

2016-06-06 Thread Thomas Weise
Hi Raja, Which Kafka version are you using? With the new 0.9 connector there is no need for the offset manager: https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka Thanks, Thomas On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli

Re: avrò deserialization fails when using kafka

2016-06-06 Thread Thomas Weise
Since you are creating the decoder in setup(), please mark the property transient. No need to checkpoint it. Thanks, Thomas On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath wrote: > >

avrò deserialization fails when using kafka

2016-06-06 Thread Raja . Aravapalli
Hi, I am trying to read data from kafka, and my input in kafka is avro messages. So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka.. And in the next operator I am reading input as "byte[]” and deserializing the message!! But the tuple deserialization is