There might be reasons why a user would want different parallelism at the head operators (depending on what else that head operator might process) so restricting them to the same parallelism is a little bit weird don't you think? It kind of goes against the whole opeartors-parallelism idea.
I don't think its a huge complexity to group head operators together by parallelism and add a source/sink per each group like we do now. What do you say? Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl. 31., P, 17:10): > Yes, I'm not saying that it makes sense to do it, I'm just saying that it > does translate and run. Your observation is true. :D > > I'm wondering whether it makes sense to allow users to have iteration heads > with differing parallelism, in fact. > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <gyula.f...@gmail.com> wrote: > > > I still don't get how it could possibly work, let me tell you how I see > and > > correct me in my logic :) > > > > You have this program: > > ids.map1().setParallelism(2) > > ids.map2().setParallelism(4) > > > > //... > > > > ids.closeWith(feedback.groupBy(0)) > > > > You are suggesting that we only have one iteration source/sink pair with > > parallelism of either 2 or 4. I will assume that the parallelism is 2 for > > the sake of the argument. > > > > The iteration source is connected to map1 and map2 with Forward > > partitioning and the sink is connected with groupBy(0). > > Each sink instance will receive all tuples of a given key which also > means > > that each iteration source instance (2) will too. > > > > Now here comes the problem: the source will forward the tuples to map 1 > and > > since we have forward connection we maintiain the groupby semantics (this > > is perfect.) the sources will also forward to map 2 which has higher > > parallelism so the tuple sending turns into round robin, which screws up > > the groupby. > > > > What did I miss? > > Gyula > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl. > 31., > > P, 14:59): > > > > > Yes, this would still work. For example, I have this crazy graph: > > > http://postimg.org/image/xtv8ay8hv/full/ That results from this > program: > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > It works, and the implementation is very simple, actually. > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > > > > I mean that the head operators have different parallelism: > > > > > > > > IterativeDataStream ids = ... > > > > > > > > ids.map().setParallelism(2) > > > > ids.map().setParallelism(4) > > > > > > > > //... > > > > > > > > ids.closeWith(feedback) > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl. > > > 31., > > > > P, 14:23): > > > > > > > > > I thought about having some tighter restrictions here. My idea was > to > > > > > enforce that the feedback edges must have the same parallelism as > the > > > > > original input stream, otherwise shipping strategies such as > "keyBy", > > > > > "shuffle", "rebalance" don't seem to make sense because they would > > > differ > > > > > from the distribution of the original elements (at least IMHO). > Maybe > > > I'm > > > > > wrong there, though. > > > > > > > > > > To me it seems intuitive that I get the feedback at the head they > > way I > > > > > specify it at the tail. But maybe that's also just me... :D > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <gyf...@apache.org> wrote: > > > > > > > > > > > Hey, > > > > > > > > > > > > I am not sure what is the intuitive behaviour here. As you are > not > > > > > applying > > > > > > a transformation on the feedback stream but pass it to a > closeWith > > > > > method, > > > > > > I thought it was somehow nature that it gets the partitioning of > > the > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > If others also think that preserving feedback partitioning should > > be > > > > the > > > > > > default I am not against it :) > > > > > > > > > > > > Btw, this still won't make it very simple. We still need as many > > > > > > source/sink pairs as we have different parallelism among the head > > > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > > > > > Cheers, > > > > > > Gyula > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. > > júl. > > > > > 31., > > > > > > P, 11:52): > > > > > > > > > > > > > Hi, > > > > > > > I'm currently working on making the StreamGraph generation more > > > > > > centralized > > > > > > > (i.e. not spread across the different API classes). The > question > > is > > > > now > > > > > > why > > > > > > > we need to switch to preserve partitioning? Could we not make > > > > > "preserve" > > > > > > > partitioning the default and if users want to have shuffle > > > > partitioning > > > > > > or > > > > > > > anything they have to specify it manually when adding the > > feedback > > > > > edge? > > > > > > > > > > > > > > This would make for a very simple scheme where the iteration > > > sources > > > > > are > > > > > > > always connected to the heads using "forward" and the tails are > > > > > connected > > > > > > > to the iteration sinks using whatever partitioner was set by > the > > > > user. > > > > > > This > > > > > > > would make it more transparent than the current default of the > > > > > "shuffle" > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > Cheers, > > > > > > > Aljoscha > > > > > > > > > > > > > > P.S. I now we had quite some discussion about introducing > > "preserve > > > > > > > partitioning" but now, when I think of it it should be the > > > default... > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > >