Hi Val, Thank you for the feedback, yes instead of using checkpointing we can store the intermediate snapshot of results directly in Ignite caches.
Also, the underlying support for exactly-once guarantee in the Ignite core module will be great and we can use it for Ignite Compute. Regards, Saikat On Wed, Aug 19, 2020 at 4:13 PM Valentin Kulichenko < valentin.kuliche...@gmail.com> wrote: > Hi Saikat, > > Makes sense. Note that the checkpointing API is a candidate for removal in > Ignite 3.0 - it's better to store intermediate results directly in Ignite > caches. > > Also, my feeling is that simple checkpointing might not be enough for the > integration, especially if we want to pursue the exactly-once guarantee. I > will create a separate thread on the Ignite dev list to discuss how we can > add such support. > > -Val > > On Tue, Aug 18, 2020 at 6:46 PM Saikat Maitra <saikat.mai...@gmail.com> > wrote: > >> Hi Val, >> >> Thank you for your response. I like the idea of reactive event based >> processing engine for fault tolerance. As you mentioned it will be upto >> underlying system to manage job execution and offer fault tolerance and we >> will need to build it in Ignite compute execution model. >> >> I looked into Flink and Samza runners and they both offer fault >> tolerance using checkpointing mechanism. >> >> Flink - Fault-tolerance with *exactly-once* processing guarantees [1] >> Samza - Fault-tolerance with support for incremental checkpointing of >> state >> instead of full snapshots. This enables Samza to scale to applications >> with >> very large state. [2] >> >> I will look into it further how we can implement checkpointing[3] for >> Ignite compute job when running beam pipeline. >> >> [1] https://beam.apache.org/documentation/runners/flink/ >> [2] https://beam.apache.org/documentation/runners/samza/ >> [3] https://apacheignite.readme.io/docs/checkpointing >> >> Regards, >> Saikat >> >> >> >> >> >> On Tue, Aug 18, 2020 at 7:29 PM Valentin Kulichenko < >> valentin.kuliche...@gmail.com> wrote: >> >> > Hi Saikat, >> > >> > Thanks for clarifying. Is there a Beam component that monitors the >> state, >> > or this is up to the application? If something fails, will the >> application >> > have to retry the whole pipeline? >> > >> > My concern is that Ignite compute actually provides very limited >> > guarantees, especially for the async execution. There are some failover >> > mechanisms, but overall it's up to the application to track the state >> and >> > retry. Moreover, if the application fails, all jobs it has submitted are >> > canceled. >> > >> > I'm thinking that Ignite should have a reactive event-based processing >> > engine. The basic idea is this: >> > - an application submits an event into the cluster >> > - the event is persisted in Ignite to be eventually processed >> > - a processed event may result in some new events that are submitted in >> > the similar fashion >> > >> > Ignite will provide the at-least-once guarantee (or even exactly-once >> > under certain assumptions) for all the event handlers, so a user can >> create >> > a whole chain by submitting a single event, and they don't have to worry >> > about failures - it's up to Ignite to handle them. >> > >> > It seems to me that it might be beneficial for the Beam runner to have >> > such an engine under the hood. What do you think? >> > >> > -Val >> > >> > On Mon, Aug 17, 2020 at 6:00 PM Saikat Maitra <saikat.mai...@gmail.com> >> > wrote: >> > >> >> Hi, >> >> >> >> Luke - Thank you for sharing the details for the portability layer for >> >> Flink, Samza and Spark. I will look into them and will reach out if I >> have >> >> any questions. >> >> >> >> Val - Thank you for your response, yes I am planning to run the beam >> >> pipeline using Ignite compute engine in async run. Here is a sample >> code >> >> for the run method. >> >> >> >> IgnitePipelineResult pipelineResult = new IgnitePipelineResult(job, >> >> metricsAccumulator); >> >> ComputeTaskFuture<Void> computeTaskFuture = >> >> ignite.compute().withAsync().run( >> >> (r, f) -> { >> >> pipelineResult.freeze(f); >> >> metricsAccumulator.destroy(); >> >> ignite.shutdown(); >> >> }); >> >> pipelineResult.setComputeFuture(asyncCompute.future()); >> >> >> >> return pipelineResult; >> >> >> >> >> >> My understanding is for failover scenarios we will need to map the job >> >> state from Ignite known state to Beam Job state, an example like in >> >> JetPipelineResult >> >> >> >> >> https://github.com/apache/beam/blob/master/runners/jet/src/main/java/org/apache/beam/runners/jet/JetPipelineResult.java#L68-L90 >> >> >> >> Regards, >> >> Saikat >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Mon, Aug 17, 2020 at 2:27 PM Valentin Kulichenko < >> >> valentin.kuliche...@gmail.com> wrote: >> >> >> >> > Hi Saikat, >> >> > >> >> > This sounds very interesting - I've been thinking about how Ignite >> >> compute >> >> > engine could be enhanced, and integration with Apache Beam is one of >> the >> >> > options I have in mind. Can you please describe how you plan to >> >> implement >> >> > this? Will it run on top of the Ignite Compute Grid? How are you >> going >> >> to >> >> > handle the failover, especially in the case of async pipeline >> execution? >> >> > >> >> > -Val >> >> > >> >> > On Sat, Aug 15, 2020 at 12:50 PM Saikat Maitra < >> saikat.mai...@gmail.com >> >> > >> >> > wrote: >> >> > >> >> > > Hi, >> >> > > >> >> > > I have been working on implementing the Apache Ignite Runner to run >> >> > Apache >> >> > > Beam pipeline. I have created IgniteRunner and >> IgnitePipelineOptions. >> >> I >> >> > > have implemented the normalize pipeline method and currently >> working >> >> on >> >> > run >> >> > > method implementation for Pipeline and IgnitePipelineTranslator. >> >> > > >> >> > > Jira : https://issues.apache.org/jira/browse/BEAM-9045 >> >> > > >> >> > > PR : https://github.com/apache/beam/pull/12593 >> >> > > >> >> > > Please review and feel free to share any feedback or questions. >> >> > > >> >> > > Regards, >> >> > > Saikat >> >> > > >> >> > >> >> >> > >> >