Hi Christian,

It is great to see use iterative use cases, thanks for sharing your problem!

Superstep iterative BSP synchronization for streams is a problem we have been 
looking into extensively, however this functionality is still not standardised 
yet on Flink.
I think your use case is fully covered by our proposed approach, described in a 
research talk at Flink Forward 18 in Berlin [1] (probably there is a video 
available too at the dataArtisans website). 
Take a look and in case this approach satisfies your needs and you would like 
to test out your application with our current prototype please do PM me!

Paris

[1] 
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2018-paris-carbone-stream-loops-on-flink-reinventing-the-wheel-for-the-streaming-era

> On 29 Sep 2018, at 20:51, Christian Lehner <christian.lehne...@gmail.com> 
> wrote:
> 
> Hi all,
> 
> 
> if you don't want to read the wall of text below, in short, I want to know if 
> it is possible to get a superstep-based iteration on a possibly unbounded 
> DataStream in Flink in an efficient way and what general concept(s) of 
> synchronization you would suggest for that.
> 
> 
> I would like to write a program that has different vertices (realized just as 
> Longs for now) in a graph which all store a keyed state and communicate with 
> each other with messages that arrive in an iterated stream.
> 
> From the outside I would only get the messages to add (or, possibly in the 
> future, delete, however that can be ignored for now) a certain vertex with 
> some possible additional information specified by the program (this message 
> can be assumed to have the same form as any other message) and then the rest 
> would happen through an iterated stream keyed by the vertex to which the 
> message is adressed in which a vertex through a KeyedProcessFunction (or 
> KeyedBroadcastProcessFunction if a BroadcastStream is used for 
> synchronization) can send new messages to any other vertex (or itself) based 
> on the received message(s) and its own current state and can also update its 
> state based on the received message(s). The new messages would then be fed 
> back into the iterated stream. If no synchronization is done this works quite 
> well, however it doesn't produce helpful results for my problem since no 
> order in which the messages arrive can be guaranteed.
> 
> What I would optimally like to have is a pregel-like superstep-based 
> iteration which runs on a batch of outside messages (here: vertex additions) 
> until no more messages are produced and after that repeats that with the next 
> batch of vertices either infinitely or until there are no more new messages 
> received. During the execution of each batch all vertices (including older 
> ones) can be activated again by receiving a message and the state of each 
> vertex should be preserved throughout the execution of the program. The 
> problem lies in how I can seperate the messages into supersteps in an 
> iterative partitioned stream similar to the iterations in the DataSetAPI.
> 
> One idea I had was just making tumbling windows of a large enough amount of 
> time which would just collect all the messages and then emit them in a 
> ProcessWindowFunction once the window fires. While this would be quite a 
> simple solution that requires little non-parallel synchonization and it would 
> obviously require that we know such a time in which we can be guaranteed that 
> all messages have been processed and all new messages for the next superstep 
> produced which is realistically not the case. It would also mean that in most 
> supersteps the program would wait longer than necessary until it starts the 
> next superstep. Fault tolerance would also be very hard to achieve.
> 
> Another more complex idea was to just globally synchronize with an object 
> that remembers which vertices have been sent messages in the previous 
> superstep by being informed before any message is sent and then is also 
> informed when a vertex is done with processing a message and informs the 
> vertex if there globally are no more messages to be processed. If that is the 
> case the vertex then sends a NextSuperstep message which is broadcast to all 
> partitions with a BroadcastStream. After that all vertices can start with 
> processing all messages sent to them in the previous superstep. Other than 
> not being trivially to synchronize without any problems (which I'm working on 
> myself) this approach has the obvious disadvantage that a lot of information 
> has to be passed to this object in a globally synchronized manner which kind 
> of kills the point of parallel processing. Although it is obvious that some 
> global synchronization probably has to take place this approach seems rather 
> ineffective to me.
> 
> Since I haven't been working with flink for very long, although I have 
> intensively used it for the past couple of weeks and read all releveant 
> documentation I could find, I would like to ask if someone has a suggestion 
> how to implement such a superstep-based iteration in the DataStreamAPI in the 
> most efficient way with Flink and if you think this is actually even a 
> worthwhile endeavor. I would especially like to know if Flink already 
> provides classes, methods or concepts that would be helpful for that.
> 
> Since our project isn't really close to a finished program yet and consists 
> mainly of various test programs, I cannot really show you a complete code of 
> what I already have, but if anyone has any specific questions I probably can 
> send you a pseudocode or a java code of one of these test programs to 
> describe what I imagine. Also, since we are still relatively open on how 
> exactly we want to solve our original problem, I'm also open to suggestions 
> which solve only  a similar problem, even if they don't fully fit what I 
> described above.
> 
> It's of course also possible that there is already a simple solution in Flink 
> which I somehow manged to overlook until now. In that case I'm sorry for 
> bothering you but I would still like to know what I should look up.
> 
> 
> Best, Christian
> 
> 
> 

Reply via email to