Hi Raja, Yes, I think if you implement the interface and set it as input operator property It should serve the purpose.
I don't think it would be a bottle neck since It is just a list data structure of numbers and it only update every checkpoint interval. Regards, Siyuan On Mon, Jun 6, 2016 at 5:43 PM, Raja.Aravapalli <[email protected]> wrote: > > > Thanks a lot Siyuan. It helped me understand better!! > > > So, can you pls confirm, if I implement the offsetManager interface, it > will be used to load initial starting position and update the offset > status[at some interval] ? > > Will the application latency greatly decreases if I use HDFS for storage ? > > Thank you very much. > > Regards, > Raja. > > From: "[email protected]" <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Monday, June 6, 2016 at 7:13 PM > > To: "[email protected]" <[email protected]> > Subject: Re: kafka offset commit > > Raja, > > Not exactly, Apex actually stores offsets as part of the operator state, > And state of the operator are checkpointed internally and periodically( in > HDFS by default). For more details, you can read this > https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/ > > With that said, offsets are stored in HDFS along with other state of the > operator so that it can recover in case of any system failure. > And also in Apex, you can do stateful restart (start the application by > specifying the previous application id). It will initialize all operators > and load the checkpointed state (offsets will be part of it) from HDFS and > continue run from that state. The only limit is, you can not easy tell > where the current offsets are. Hope this answered your question. > > Regards, > Siyuan > > > On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli < > [email protected]> wrote: > >> >> Thanks Siyuan. >> >> So, to confirm, to apex is not storing offsets status at any location ? >> Like how Storm stores in Zookeeper ? >> >> >> Regards, >> Raja. >> >> From: "[email protected]" <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Monday, June 6, 2016 at 6:42 PM >> >> To: "[email protected]" <[email protected]> >> Subject: Re: kafka offset commit >> >> Hey Raja, >> >> For 0.8, you have to implement OffsetManager interface on your own. The >> updateOffsets will be called in application master every time when it get >> updated offsets from each physical partition. And the offsets that you see >> in the method is committed offset. So you can safely save these offsets >> into either zookeeper(0.8.2 client has API to do that) or any other >> datastore like DB or HDFS. And also you have to implement the method >> loadInitialOffsets to load back offset you want. >> >> You are welcome to contribute a default implementation using buildin >> kafka offset commit request API for OffsetManager! >> >> Regards, >> Siyuan >> >> On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli < >> [email protected]> wrote: >> >>> >>> Hi Thomas, >>> >>> We are using 0.8 cluster still!! >>> >>> >>> Regards, >>> Raja. >>> >>> From: Thomas Weise <[email protected]> >>> Reply-To: "[email protected]" <[email protected]> >>> Date: Monday, June 6, 2016 at 5:23 PM >>> To: "[email protected]" <[email protected]> >>> Subject: Re: kafka offset commit >>> >>> Hi Raja, >>> >>> Which Kafka version are you using? >>> >>> With the new 0.9 connector there is no need for the offset manager: >>> >>> >>> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka >>> >>> Thanks, >>> Thomas >>> >>> >>> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli < >>> [email protected]> wrote: >>> >>>> Hi >>>> >>>> Can someone please help me understand, where will the offsets be stored >>>> when consuming with “*KafkaSinglePortStringInputOperator*” ? >>>> >>>> And, how to handle restarts ? >>>> >>>> >>>> I worked with Storm earlier, Storm maintains the offsets in zookeeper >>>> and client id is maintained for every consumer, using which >>>> >>>> - we can see what is the current offset status for a given partition & >>>> modify them as well using zookeeper-cli !! >>>> - restarts can be handled >>>> >>>> >>>> As per the Apex documentation, I can see, that using OffsetManager we >>>> can handle the restarts effectively, but couldn’t find any examples to >>>> refer… >>>> >>>> How clientId can be used to retrieve offsets status >>>> And ability to edit the offsets etc >>>> >>>> can someone pls help me find this ? >>>> >>>> >>>> Thanks a lot!! >>>> >>>> >>>> -Regards, >>>> Raja. >>>> >>>> >>>> >>>> >>> >> >
