I don't think there is a fundamental limitation to the simpler approach. The only real difference is that DOPs are adjusted before the tail, so only one head/tail pair is needed.
Nested iterations should still be possible... On Mon, Aug 3, 2015 at 10:21 AM, Gyula Fóra <gyula.f...@gmail.com> wrote: > It is critical for many applications (such as SAMOA or Storm compatibility) > to build arbitrary cyclic flows. If your suggestion covers all cases (for > instance nested iterations) then I am not against it. > > The current implementation is just one way to do it, but it allows > arbitrary cycles. From the checkpointing perspective, I don't think this > will make too much of a difference as that will probably have to be handled > on the receiver side anyways if you think about the cyclic algorithm. > > Gyula > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. aug. 3., > H, > 9:41): > > > Yes, that's what I was proposing in my second mail: > > > > I thought about having some tighter restrictions here. My idea was to > > enforce that the feedback edges must have the same parallelism as the > > original input stream, otherwise shipping strategies such as "keyBy", > > "shuffle", "rebalance" don't seem to make sense because they would differ > > from the distribution of the original elements (at least IMHO). Maybe I'm > > wrong there, though. > > > > To me it seems intuitive that I get the feedback at the head they way I > > specify it at the tail. But maybe that's also just me... :D > > > > On Mon, 3 Aug 2015 at 00:14 Stephan Ewen <se...@apache.org> wrote: > > > > > This model strikes me as pretty complicated. Imagine the extra logic > and > > > code path necessary for proper checkpointing as well. > > > > > > Why not do a simple approach: > > > - There is one parallel head, one parallel tail, both with the same > > > parallelism > > > > > > - Any computation in between may have it own parallelism, no special > > > cases > > > > > > - If the tail does not have the same parallelism as the head, it will > > not > > > by the tail, but flow will attach an additional tail operator. Between > > the > > > original tail and the additional tail, the streams are redistributed to > > > achieve the required parallelism. > > > > > > Wouldn't that give us the same and make things much easier. The batch > > > iterations work that way, by the way. > > > > > > > > > > > > On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek <aljos...@apache.org > > > > > wrote: > > > > > > > To answer the question plain and simple: No, there are several > > different > > > > parallel heads and tails. > > > > > > > > For example in this: > > > > val iter = ds.iteration() > > > > > > > > val head_tail1 = iter.map().parallelism(2) > > > > val head_tail2 = iter.map().parallelism(4) > > > > > > > > iter.closeWith(head_tail1.union(head_tail2)) > > > > > > > > We have one head/tail pair with parallelism 2 and on with parallelism > > 4. > > > > > > > > Of the top of my head, I don't know what happens in this case though: > > > > > > > > val iter = ds.iteration() > > > > > > > > val head1 = iter.map().parallelism(2) > > > > val head2 = iter.map().parallelism(4) > > > > > > > > val tail1 = head1.map().parallelism(6) > > > > val tail2 = head2.map().parallelism(8) > > > > > > > > iter.closeWith(tail1.union(tail2)) > > > > > > > > (Which is also tricky with the parallelism of the input stream) > > > > > > > > > > > > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > > > > > > In a streaming program when we create an IterativeDataStream, we > > > > > practically mark the union point of some later feedback stream (the > > one > > > > > passed in to closeWith(..)). > > > > > > > > > > The operators applied on this IterativeDataStream will receive the > > > > feedback > > > > > input as well. We call the operators applied on the iterative > > > dataStream > > > > > head operators. We call the operators that produce the streams > passed > > > > into > > > > > closeWith tail operators. With this terminology we can have many > > heads > > > > and > > > > > tails with varying parallelism. > > > > > > > > > > Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. aug. 2., > V, > > > > > 20:16): > > > > > > > > > > > I don't get the discussion here, can you help me with what you > mean > > > by > > > > > > "different iteration heads and tails" ? > > > > > > > > > > > > An iteration does not have one parallel head and one parallel > tail? > > > > > > > > > > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra < > gyula.f...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Maybe you can reuse some of the logic that is currently there > on > > > the > > > > > > > StreamGraph, with building StreamLoops first which will be used > > to > > > > > > generate > > > > > > > the sources and sinks right before building the JobGraph. This > > > avoids > > > > > the > > > > > > > need of knowing everything beforehand. > > > > > > > > > > > > > > I actually added this to avoid the complexities that you are > > > probably > > > > > > > facing now. > > > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: > 2015. > > > júl. > > > > > > 31., > > > > > > > P, 17:28): > > > > > > > > > > > > > > > Sure it can be done, it's just more complex if you try to do > it > > > in > > > > a > > > > > > sane > > > > > > > > way without having the code that builds the StreamGraph all > > over > > > > the > > > > > > > place. > > > > > > > > :D > > > > > > > > > > > > > > > > I'll try to come up with something. This is my current work > in > > > > > > progress, > > > > > > > by > > > > > > > > the way: > > > https://github.com/aljoscha/flink/tree/stream-api-rework > > > > > > > > > > > > > > > > I managed to ban the StreamGraph from > > StreamExecutionEnvironment > > > > and > > > > > > the > > > > > > > > API classes such as DataStream. The API methods construct a > > Graph > > > > of > > > > > > > > Transformation Nodes and don't contain any information > > > themselves. > > > > > Then > > > > > > > > there is a StreamGraphGenerator that builds a StreamGraph > from > > > the > > > > > > > > transformations. The abstraction is very nice and simple, the > > > only > > > > > > > problem > > > > > > > > that remains are the differing-parallelism-iterations but > I'll > > > > figure > > > > > > > them > > > > > > > > out. > > > > > > > > > > > > > > > > P.S. The code is not well documented yet, but the base class > > for > > > > > > > > transformations is StreamTransformation. From there anyone > who > > > > want's > > > > > > to > > > > > > > > check it out can find the other transformations. > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra < > gyula.f...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > There might be reasons why a user would want different > > > > parallelism > > > > > at > > > > > > > the > > > > > > > > > head operators (depending on what else that head operator > > might > > > > > > > process) > > > > > > > > so > > > > > > > > > restricting them to the same parallelism is a little bit > > weird > > > > > don't > > > > > > > you > > > > > > > > > think? It kind of goes against the whole > > opeartors-parallelism > > > > > idea. > > > > > > > > > > > > > > > > > > I don't think its a huge complexity to group head operators > > > > > together > > > > > > by > > > > > > > > > parallelism and add a source/sink per each group like we do > > > now. > > > > > What > > > > > > > do > > > > > > > > > you say? > > > > > > > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: > > > 2015. > > > > > júl. > > > > > > > > 31., > > > > > > > > > P, 17:10): > > > > > > > > > > > > > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm > just > > > > saying > > > > > > > that > > > > > > > > it > > > > > > > > > > does translate and run. Your observation is true. :D > > > > > > > > > > > > > > > > > > > > I'm wondering whether it makes sense to allow users to > have > > > > > > iteration > > > > > > > > > heads > > > > > > > > > > with differing parallelism, in fact. > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra < > > > gyula.f...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > I still don't get how it could possibly work, let me > tell > > > you > > > > > > how I > > > > > > > > see > > > > > > > > > > and > > > > > > > > > > > correct me in my logic :) > > > > > > > > > > > > > > > > > > > > > > You have this program: > > > > > > > > > > > ids.map1().setParallelism(2) > > > > > > > > > > > ids.map2().setParallelism(4) > > > > > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > > > > > > > > > > > > > > > You are suggesting that we only have one iteration > > > > source/sink > > > > > > pair > > > > > > > > > with > > > > > > > > > > > parallelism of either 2 or 4. I will assume that the > > > > > parallelism > > > > > > > is 2 > > > > > > > > > for > > > > > > > > > > > the sake of the argument. > > > > > > > > > > > > > > > > > > > > > > The iteration source is connected to map1 and map2 with > > > > Forward > > > > > > > > > > > partitioning and the sink is connected with groupBy(0). > > > > > > > > > > > Each sink instance will receive all tuples of a given > key > > > > which > > > > > > > also > > > > > > > > > > means > > > > > > > > > > > that each iteration source instance (2) will too. > > > > > > > > > > > > > > > > > > > > > > Now here comes the problem: the source will forward the > > > > tuples > > > > > to > > > > > > > > map 1 > > > > > > > > > > and > > > > > > > > > > > since we have forward connection we maintiain the > groupby > > > > > > semantics > > > > > > > > > (this > > > > > > > > > > > is perfect.) the sources will also forward to map 2 > > which > > > > has > > > > > > > higher > > > > > > > > > > > parallelism so the tuple sending turns into round > robin, > > > > which > > > > > > > screws > > > > > > > > > up > > > > > > > > > > > the groupby. > > > > > > > > > > > > > > > > > > > > > > What did I miss? > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta > > (időpont: > > > > > 2015. > > > > > > > júl. > > > > > > > > > > 31., > > > > > > > > > > > P, 14:59): > > > > > > > > > > > > > > > > > > > > > > > Yes, this would still work. For example, I have this > > > crazy > > > > > > graph: > > > > > > > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That > results > > > from > > > > > > this > > > > > > > > > > program: > > > > > > > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > > > > > > > > > > > > > > > It works, and the implementation is very simple, > > > actually. > > > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra < > > > > > gyula.f...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > I mean that the head operators have different > > > > parallelism: > > > > > > > > > > > > > > > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > > > > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > > > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta > > > > (időpont: > > > > > > > 2015. > > > > > > > > > júl. > > > > > > > > > > > > 31., > > > > > > > > > > > > > P, 14:23): > > > > > > > > > > > > > > > > > > > > > > > > > > > I thought about having some tighter restrictions > > > here. > > > > My > > > > > > > idea > > > > > > > > > was > > > > > > > > > > to > > > > > > > > > > > > > > enforce that the feedback edges must have the > same > > > > > > > parallelism > > > > > > > > as > > > > > > > > > > the > > > > > > > > > > > > > > original input stream, otherwise shipping > > strategies > > > > such > > > > > > as > > > > > > > > > > "keyBy", > > > > > > > > > > > > > > "shuffle", "rebalance" don't seem to make sense > > > because > > > > > > they > > > > > > > > > would > > > > > > > > > > > > differ > > > > > > > > > > > > > > from the distribution of the original elements > (at > > > > least > > > > > > > IMHO). > > > > > > > > > > Maybe > > > > > > > > > > > > I'm > > > > > > > > > > > > > > wrong there, though. > > > > > > > > > > > > > > > > > > > > > > > > > > > > To me it seems intuitive that I get the feedback > at > > > the > > > > > > head > > > > > > > > they > > > > > > > > > > > way I > > > > > > > > > > > > > > specify it at the tail. But maybe that's also > just > > > > me... > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra < > > > > > gyf...@apache.org > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour > > here. > > > > As > > > > > > you > > > > > > > > are > > > > > > > > > > not > > > > > > > > > > > > > > applying > > > > > > > > > > > > > > > a transformation on the feedback stream but > pass > > it > > > > to > > > > > a > > > > > > > > > > closeWith > > > > > > > > > > > > > > method, > > > > > > > > > > > > > > > I thought it was somehow nature that it gets > the > > > > > > > partitioning > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If others also think that preserving feedback > > > > > > partitioning > > > > > > > > > should > > > > > > > > > > > be > > > > > > > > > > > > > the > > > > > > > > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We > > still > > > > > need > > > > > > as > > > > > > > > > many > > > > > > > > > > > > > > > source/sink pairs as we have different > > parallelism > > > > > among > > > > > > > the > > > > > > > > > head > > > > > > > > > > > > > > > operators. Otherwise the forwarding logic wont > > > work. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt > írta > > > > > > (időpont: > > > > > > > > > 2015. > > > > > > > > > > > júl. > > > > > > > > > > > > > > 31., > > > > > > > > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > I'm currently working on making the > StreamGraph > > > > > > > generation > > > > > > > > > more > > > > > > > > > > > > > > > centralized > > > > > > > > > > > > > > > > (i.e. not spread across the different API > > > classes). > > > > > The > > > > > > > > > > question > > > > > > > > > > > is > > > > > > > > > > > > > now > > > > > > > > > > > > > > > why > > > > > > > > > > > > > > > > we need to switch to preserve partitioning? > > Could > > > > we > > > > > > not > > > > > > > > make > > > > > > > > > > > > > > "preserve" > > > > > > > > > > > > > > > > partitioning the default and if users want to > > > have > > > > > > > shuffle > > > > > > > > > > > > > partitioning > > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > anything they have to specify it manually > when > > > > adding > > > > > > the > > > > > > > > > > > feedback > > > > > > > > > > > > > > edge? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This would make for a very simple scheme > where > > > the > > > > > > > > iteration > > > > > > > > > > > > sources > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > always connected to the heads using "forward" > > and > > > > the > > > > > > > tails > > > > > > > > > are > > > > > > > > > > > > > > connected > > > > > > > > > > > > > > > > to the iteration sinks using whatever > > partitioner > > > > was > > > > > > set > > > > > > > > by > > > > > > > > > > the > > > > > > > > > > > > > user. > > > > > > > > > > > > > > > This > > > > > > > > > > > > > > > > would make it more transparent than the > current > > > > > default > > > > > > > of > > > > > > > > > the > > > > > > > > > > > > > > "shuffle" > > > > > > > > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about > > > > > > introducing > > > > > > > > > > > "preserve > > > > > > > > > > > > > > > > partitioning" but now, when I think of it it > > > should > > > > > be > > > > > > > the > > > > > > > > > > > > default... > > > > > > > > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >