Hi Christian, It is great to see use iterative use cases, thanks for sharing your problem!
Superstep iterative BSP synchronization for streams is a problem we have been looking into extensively, however this functionality is still not standardised yet on Flink. I think your use case is fully covered by our proposed approach, described in a research talk at Flink Forward 18 in Berlin [1] (probably there is a video available too at the dataArtisans website). Take a look and in case this approach satisfies your needs and you would like to test out your application with our current prototype please do PM me! Paris [1] https://www.slideshare.net/FlinkForward/flink-forward-berlin-2018-paris-carbone-stream-loops-on-flink-reinventing-the-wheel-for-the-streaming-era > On 29 Sep 2018, at 20:51, Christian Lehner <christian.lehne...@gmail.com> > wrote: > > Hi all, > > > if you don't want to read the wall of text below, in short, I want to know if > it is possible to get a superstep-based iteration on a possibly unbounded > DataStream in Flink in an efficient way and what general concept(s) of > synchronization you would suggest for that. > > > I would like to write a program that has different vertices (realized just as > Longs for now) in a graph which all store a keyed state and communicate with > each other with messages that arrive in an iterated stream. > > From the outside I would only get the messages to add (or, possibly in the > future, delete, however that can be ignored for now) a certain vertex with > some possible additional information specified by the program (this message > can be assumed to have the same form as any other message) and then the rest > would happen through an iterated stream keyed by the vertex to which the > message is adressed in which a vertex through a KeyedProcessFunction (or > KeyedBroadcastProcessFunction if a BroadcastStream is used for > synchronization) can send new messages to any other vertex (or itself) based > on the received message(s) and its own current state and can also update its > state based on the received message(s). The new messages would then be fed > back into the iterated stream. If no synchronization is done this works quite > well, however it doesn't produce helpful results for my problem since no > order in which the messages arrive can be guaranteed. > > What I would optimally like to have is a pregel-like superstep-based > iteration which runs on a batch of outside messages (here: vertex additions) > until no more messages are produced and after that repeats that with the next > batch of vertices either infinitely or until there are no more new messages > received. During the execution of each batch all vertices (including older > ones) can be activated again by receiving a message and the state of each > vertex should be preserved throughout the execution of the program. The > problem lies in how I can seperate the messages into supersteps in an > iterative partitioned stream similar to the iterations in the DataSetAPI. > > One idea I had was just making tumbling windows of a large enough amount of > time which would just collect all the messages and then emit them in a > ProcessWindowFunction once the window fires. While this would be quite a > simple solution that requires little non-parallel synchonization and it would > obviously require that we know such a time in which we can be guaranteed that > all messages have been processed and all new messages for the next superstep > produced which is realistically not the case. It would also mean that in most > supersteps the program would wait longer than necessary until it starts the > next superstep. Fault tolerance would also be very hard to achieve. > > Another more complex idea was to just globally synchronize with an object > that remembers which vertices have been sent messages in the previous > superstep by being informed before any message is sent and then is also > informed when a vertex is done with processing a message and informs the > vertex if there globally are no more messages to be processed. If that is the > case the vertex then sends a NextSuperstep message which is broadcast to all > partitions with a BroadcastStream. After that all vertices can start with > processing all messages sent to them in the previous superstep. Other than > not being trivially to synchronize without any problems (which I'm working on > myself) this approach has the obvious disadvantage that a lot of information > has to be passed to this object in a globally synchronized manner which kind > of kills the point of parallel processing. Although it is obvious that some > global synchronization probably has to take place this approach seems rather > ineffective to me. > > Since I haven't been working with flink for very long, although I have > intensively used it for the past couple of weeks and read all releveant > documentation I could find, I would like to ask if someone has a suggestion > how to implement such a superstep-based iteration in the DataStreamAPI in the > most efficient way with Flink and if you think this is actually even a > worthwhile endeavor. I would especially like to know if Flink already > provides classes, methods or concepts that would be helpful for that. > > Since our project isn't really close to a finished program yet and consists > mainly of various test programs, I cannot really show you a complete code of > what I already have, but if anyone has any specific questions I probably can > send you a pseudocode or a java code of one of these test programs to > describe what I imagine. Also, since we are still relatively open on how > exactly we want to solve our original problem, I'm also open to suggestions > which solve only a similar problem, even if they don't fully fit what I > described above. > > It's of course also possible that there is already a simple solution in Flink > which I somehow manged to overlook until now. In that case I'm sorry for > bothering you but I would still like to know what I should look up. > > > Best, Christian > > >