Thanks a lot Siyuan. It helped me understand better!!


So, can you pls confirm, if I implement the offsetManager interface, it will be 
used to load initial starting position and update the offset status[at some 
interval] ?

Will the application latency greatly decreases if I use HDFS for storage ?

Thank you very much.

Regards,
Raja.

From: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, June 6, 2016 at 7:13 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: kafka offset commit

Raja,

Not exactly, Apex actually stores offsets as part of the operator state, And 
state of the operator are checkpointed internally and periodically( in HDFS by 
default). For more details, you can read this 
https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/

With that said, offsets are stored in HDFS along with other state of the 
operator so that it can recover in case of any system failure.
And also in Apex, you can do stateful restart (start the application by 
specifying the previous application id). It will initialize all operators and 
load the checkpointed state (offsets will be part of it) from HDFS and continue 
run from that state.  The only limit is, you can not easy tell where the 
current offsets are.  Hope this answered your question.

Regards,
Siyuan


On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

Thanks Siyuan.

So, to confirm, to apex is not storing offsets status at any location ? Like 
how Storm stores in Zookeeper ?


Regards,
Raja.

From: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, June 6, 2016 at 6:42 PM

To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: kafka offset commit

Hey Raja,

For 0.8, you have to implement OffsetManager interface on your own. The 
updateOffsets will be called in application master every time when it get 
updated offsets from each physical partition. And the offsets that you see in 
the method is committed offset. So you can safely save these offsets into 
either zookeeper(0.8.2 client has API to do that) or any other datastore like 
DB or HDFS.  And also you have to implement the method loadInitialOffsets to 
load back offset you want.

You are welcome to contribute a default implementation using buildin kafka 
offset commit request API for OffsetManager!

Regards,
Siyuan

On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

Hi Thomas,

We are using 0.8 cluster still!!


Regards,
Raja.

From: Thomas Weise <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, June 6, 2016 at 5:23 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: kafka offset commit

Hi Raja,

Which Kafka version are you using?

With the new 0.9 connector there is no need for the offset manager:

https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka

Thanks,
Thomas


On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:
Hi

Can someone please help me understand, where will the offsets be stored when 
consuming with “KafkaSinglePortStringInputOperator”  ?

And, how to handle restarts ?


I worked with Storm earlier, Storm maintains the offsets in zookeeper and 
client id is maintained for every consumer, using which

- we can see what is the current offset status for a given partition & modify 
them as well using zookeeper-cli !!
- restarts can be handled


As per the Apex documentation, I can see, that using OffsetManager we can 
handle the restarts effectively, but couldn’t find any examples to refer…

How clientId can be used to retrieve offsets status
And ability to edit the offsets etc

can someone pls help me find this ?


Thanks a lot!!


-Regards,
Raja.






Reply via email to