[ https://issues.apache.org/jira/browse/STORM-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15284752#comment-15284752 ]
Robert Joseph Evans commented on STORM-1757: -------------------------------------------- To make distributed check pointing work we need more coordination between the Sources and the bolts than ack/fail really offers. For this I would propose that we have each Unbounded Source run as a spout. All of the spouts would need to coordinate with each other so that the barriers can be emitted close to one another, reducing the amount of data that would need to be buffered. We would also need coordination for restoring checkpoints. For this I would propose that we use zookeeper for this. Simply because it is already available and we would not need to add anything new to base storm to support it. All of the spouts would elect a leader through zookeeper. The leader would then trigger all of the spouts to emit a barrier and checkpoint the spout metadata. Because we are going to potentially have multiple checkpoints outstanding at any point in time we will need to label all of the checkpoints. I would label them with two numbers. The first would be the bundle/batch number, the second would be the replay, or generation number. The bundle number would increment for each barrier emitted, but would role back on failure. The generation number would increment for any failure. This would allow downstream bolts to be able to restore a checkpoint just by seeing the bundle id. Spouts would have acking to know if a downstream tuple failed. If an item fails the spout would inform the leader through zookeeper of the bad batch. The leader would then inform all of the other spouts to restore and start again. Each spout would also have to inform the leader periodically when a batch is fully acked. Once all of the spouts inform the leader that all of the tuples are emitted, then the leader can inform the spouts that they can delete the old checkpoints. They should also inform the downstream bolts as part of the barrier so they can clean up their old checkpoints too. We can work out the exact details of how this will work later on. For the actual check pointing in the bolts only the GroupByKey transform would need to do anything, and for simplicity it would checkpoint each pane and key separately, so that the checkpoints are incremental, and so that we can support very large windows without too much difficulty. In general all of this seems totally doable, and actually not that difficult. My biggest concern by far is around efficiency in the check pointing, especially for large windows. The check pointing is something that we need to do, and in the common case should be thrown away. So we want to be sure that we optimize for throwing the data away. We can easily write something that can be backed by HBase or most any nosql store. But that is going to add a lot of iops and network load that I am not too thrilled about. But perhaps it does not really matter for an initial deployment. > Apache Beam Runner for Storm > ---------------------------- > > Key: STORM-1757 > URL: https://issues.apache.org/jira/browse/STORM-1757 > Project: Apache Storm > Issue Type: Brainstorming > Reporter: P. Taylor Goetz > Priority: Minor > > This is a call for interested parties to collaborate on an Apache Beam [1] > runner for Storm, and express their thoughts and opinions. > Given the addition of the Windowing API to Apache Storm, we should be able to > map naturally to the Beam API. If not, it may be indicative of shortcomings > of the Storm API that should be addressed. > [1] http://beam.incubator.apache.org -- This message was sent by Atlassian JIRA (v6.3.4#6332)