Re: Superstep-like synchronization of streaming iteration

2018-10-01 Thread Paris Carbone
Hi Christian,

It is great to see use iterative use cases, thanks for sharing your problem!

Superstep iterative BSP synchronization for streams is a problem we have been 
looking into extensively, however this functionality is still not standardised 
yet on Flink.
I think your use case is fully covered by our proposed approach, described in a 
research talk at Flink Forward 18 in Berlin [1] (probably there is a video 
available too at the dataArtisans website). 
Take a look and in case this approach satisfies your needs and you would like 
to test out your application with our current prototype please do PM me!

Paris

[1] 
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2018-paris-carbone-stream-loops-on-flink-reinventing-the-wheel-for-the-streaming-era

> On 29 Sep 2018, at 20:51, Christian Lehner  
> wrote:
> 
> Hi all,
> 
> 
> if you don't want to read the wall of text below, in short, I want to know if 
> it is possible to get a superstep-based iteration on a possibly unbounded 
> DataStream in Flink in an efficient way and what general concept(s) of 
> synchronization you would suggest for that.
> 
> 
> I would like to write a program that has different vertices (realized just as 
> Longs for now) in a graph which all store a keyed state and communicate with 
> each other with messages that arrive in an iterated stream.
> 
> From the outside I would only get the messages to add (or, possibly in the 
> future, delete, however that can be ignored for now) a certain vertex with 
> some possible additional information specified by the program (this message 
> can be assumed to have the same form as any other message) and then the rest 
> would happen through an iterated stream keyed by the vertex to which the 
> message is adressed in which a vertex through a KeyedProcessFunction (or 
> KeyedBroadcastProcessFunction if a BroadcastStream is used for 
> synchronization) can send new messages to any other vertex (or itself) based 
> on the received message(s) and its own current state and can also update its 
> state based on the received message(s). The new messages would then be fed 
> back into the iterated stream. If no synchronization is done this works quite 
> well, however it doesn't produce helpful results for my problem since no 
> order in which the messages arrive can be guaranteed.
> 
> What I would optimally like to have is a pregel-like superstep-based 
> iteration which runs on a batch of outside messages (here: vertex additions) 
> until no more messages are produced and after that repeats that with the next 
> batch of vertices either infinitely or until there are no more new messages 
> received. During the execution of each batch all vertices (including older 
> ones) can be activated again by receiving a message and the state of each 
> vertex should be preserved throughout the execution of the program. The 
> problem lies in how I can seperate the messages into supersteps in an 
> iterative partitioned stream similar to the iterations in the DataSetAPI.
> 
> One idea I had was just making tumbling windows of a large enough amount of 
> time which would just collect all the messages and then emit them in a 
> ProcessWindowFunction once the window fires. While this would be quite a 
> simple solution that requires little non-parallel synchonization and it would 
> obviously require that we know such a time in which we can be guaranteed that 
> all messages have been processed and all new messages for the next superstep 
> produced which is realistically not the case. It would also mean that in most 
> supersteps the program would wait longer than necessary until it starts the 
> next superstep. Fault tolerance would also be very hard to achieve.
> 
> Another more complex idea was to just globally synchronize with an object 
> that remembers which vertices have been sent messages in the previous 
> superstep by being informed before any message is sent and then is also 
> informed when a vertex is done with processing a message and informs the 
> vertex if there globally are no more messages to be processed. If that is the 
> case the vertex then sends a NextSuperstep message which is broadcast to all 
> partitions with a BroadcastStream. After that all vertices can start with 
> processing all messages sent to them in the previous superstep. Other than 
> not being trivially to synchronize without any problems (which I'm working on 
> myself) this approach has the obvious disadvantage that a lot of information 
> has to be passed to this object in a globally synchronized manner which kind 
> of kills the point of parallel processing. Although it is obvious that some 
> global synchronization probably has to take place this approach seems rather 
> ineffective to me.
> 
> Since I haven't been working with flink for very long, although I have 
> intensively used it for the past couple of weeks and read all releveant 
> documentation I could find, I would like to ask if 

Superstep-like synchronization of streaming iteration

2018-09-29 Thread Christian Lehner

Hi all,


if you don't want to read the wall of text below, in short, I want to 
know if it is possible to get a superstep-based iteration on a possibly 
unbounded DataStream in Flink in an efficient way and what general 
concept(s) of synchronization you would suggest for that.



I would like to write a program that has different vertices (realized 
just as Longs for now) in a graph which all store a keyed state and 
communicate with each other with messages that arrive in an iterated 
stream.


From the outside I would only get the messages to add (or, possibly in 
the future, delete, however that can be ignored for now) a certain 
vertex with some possible additional information specified by the 
program (this message can be assumed to have the same form as any other 
message) and then the rest would happen through an iterated stream keyed 
by the vertex to which the message is adressed in which a vertex through 
a KeyedProcessFunction (or KeyedBroadcastProcessFunction if a 
BroadcastStream is used for synchronization) can send new messages to 
any other vertex (or itself) based on the received message(s) and its 
own current state and can also update its state based on the received 
message(s). The new messages would then be fed back into the iterated 
stream. If no synchronization is done this works quite well, however it 
doesn't produce helpful results for my problem since no order in which 
the messages arrive can be guaranteed.


What I would optimally like to have is a pregel-like superstep-based 
iteration which runs on a batch of outside messages (here: vertex 
additions) until no more messages are produced and after that repeats 
that with the next batch of vertices either infinitely or until there 
are no more new messages received. During the execution of each batch 
all vertices (including older ones) can be activated again by receiving 
a message and the state of each vertex should be preserved throughout 
the execution of the program. The problem lies in how I can seperate the 
messages into supersteps in an iterative partitioned stream similar to 
the iterations in the DataSetAPI.


One idea I had was just making tumbling windows of a large enough amount 
of time which would just collect all the messages and then emit them in 
a ProcessWindowFunction once the window fires. While this would be quite 
a simple solution that requires little non-parallel synchonization and 
it would obviously require that we know such a time in which we can be 
guaranteed that all messages have been processed and all new messages 
for the next superstep produced which is realistically not the case. It 
would also mean that in most supersteps the program would wait longer 
than necessary until it starts the next superstep. Fault tolerance would 
also be very hard to achieve.


Another more complex idea was to just globally synchronize with an 
object that remembers which vertices have been sent messages in the 
previous superstep by being informed before any message is sent and then 
is also informed when a vertex is done with processing a message and 
informs the vertex if there globally are no more messages to be 
processed. If that is the case the vertex then sends a NextSuperstep 
message which is broadcast to all partitions with a BroadcastStream. 
After that all vertices can start with processing all messages sent to 
them in the previous superstep. Other than not being trivially to 
synchronize without any problems (which I'm working on myself) this 
approach has the obvious disadvantage that a lot of information has to 
be passed to this object in a globally synchronized manner which kind of 
kills the point of parallel processing. Although it is obvious that some 
global synchronization probably has to take place this approach seems 
rather ineffective to me.


Since I haven't been working with flink for very long, although I have 
intensively used it for the past couple of weeks and read all releveant 
documentation I could find, I would like to ask if someone has a 
suggestion how to implement such a superstep-based iteration in the 
DataStreamAPI in the most efficient way with Flink and if you think this 
is actually even a worthwhile endeavor. I would especially like to know 
if Flink already provides classes, methods or concepts that would be 
helpful for that.


Since our project isn't really close to a finished program yet and 
consists mainly of various test programs, I cannot really show you a 
complete code of what I already have, but if anyone has any specific 
questions I probably can send you a pseudocode or a java code of one of 
these test programs to describe what I imagine. Also, since we are still 
relatively open on how exactly we want to solve our original problem, 
I'm also open to suggestions which solve only  a similar problem, even 
if they don't fully fit what I described above.


It's of course also possible that there is already a simple solution in 
Flink which I