[ 
https://issues.apache.org/jira/browse/STORM-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15284752#comment-15284752
 ] 

Robert Joseph Evans commented on STORM-1757:
--------------------------------------------

To make distributed check pointing work we need more coordination between the 
Sources and the bolts than ack/fail really offers.  For this I would propose 
that we have each Unbounded Source run as a spout.  All of the spouts would 
need to coordinate with each other so that the barriers can be emitted close to 
one another, reducing the amount of data that would need to be buffered.  We 
would also need coordination for restoring checkpoints.  For this I would 
propose that we use zookeeper for this.  Simply because it is already available 
and we would not need to add anything new to base storm to support it.

All of the spouts would elect a leader through zookeeper.  The leader would 
then trigger all of the spouts to emit a barrier and checkpoint the spout 
metadata.  Because we are going to potentially have multiple checkpoints 
outstanding at any point in time we will need to label all of the checkpoints.  
I would label them with two numbers.  The first would be the bundle/batch 
number, the second would be the replay, or generation number.  The bundle 
number would increment for each barrier emitted, but would role back on 
failure.  The generation number would increment for any failure.  This would 
allow downstream bolts to be able to restore a checkpoint just by seeing the 
bundle id.

Spouts would have acking to know if a downstream tuple failed.  If an item 
fails the spout would inform the leader through zookeeper of the bad batch.  
The leader would then inform all of the other spouts to restore and start again.

Each spout would also have to inform the leader periodically when a batch is 
fully acked.  Once all of the spouts inform the leader that all of the tuples 
are emitted, then the leader can inform the spouts that they can delete the old 
checkpoints.  They should also inform the downstream bolts as part of the 
barrier so they can clean up their old checkpoints too.

We can work out the exact details of how this will work later on.

For the actual check pointing in the bolts only the GroupByKey transform would 
need to do anything, and for simplicity it would checkpoint each pane and key 
separately, so that the checkpoints are incremental, and so that we can support 
very large windows without too much difficulty.

In general all of this seems totally doable, and actually not that difficult.  
My biggest concern by far is around efficiency in the check pointing, 
especially for large windows.  The check pointing is something that we need to 
do, and in the common case should be thrown away.  So we want to be sure that 
we optimize for throwing the data away.  We can easily write something that can 
be backed by HBase or most any nosql store.  But that is going to add a lot of 
iops and network load that I am not too thrilled about.  But perhaps it does 
not really matter for an initial deployment.

> Apache Beam Runner for Storm
> ----------------------------
>
>                 Key: STORM-1757
>                 URL: https://issues.apache.org/jira/browse/STORM-1757
>             Project: Apache Storm
>          Issue Type: Brainstorming
>            Reporter: P. Taylor Goetz
>            Priority: Minor
>
> This is a call for interested parties to collaborate on an Apache Beam [1] 
> runner for Storm, and express their thoughts and opinions.
> Given the addition of the Windowing API to Apache Storm, we should be able to 
> map naturally to the Beam API. If not, it may be indicative of shortcomings 
> of the Storm API that should be addressed.
> [1] http://beam.incubator.apache.org



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to