Maybe you can reuse some of the logic that is currently there on the
StreamGraph, with building StreamLoops first which will be used to generate
the sources and sinks right before building the JobGraph. This avoids the
need of knowing everything beforehand.

I actually added this to avoid the complexities that you are probably
facing now.

Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl. 31.,
P, 17:28):

> Sure it can be done, it's just more complex if you try to do it in a sane
> way without having the code that builds the StreamGraph all over the place.
> :D
>
> I'll try to come up with something. This is my current work in progress, by
> the way: https://github.com/aljoscha/flink/tree/stream-api-rework
>
> I managed to ban the StreamGraph from StreamExecutionEnvironment and the
> API classes such as DataStream. The API methods construct a Graph of
> Transformation Nodes and don't contain any information themselves. Then
> there is a StreamGraphGenerator that builds a StreamGraph from the
> transformations. The abstraction is very nice and simple, the only problem
> that remains are the differing-parallelism-iterations but I'll figure them
> out.
>
> P.S. The code is not well documented yet, but the base class for
> transformations is StreamTransformation. From there anyone who want's to
> check it out can find the other transformations.
>
> On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> > There might be reasons why a user would want different parallelism at the
> > head operators (depending on what else that head operator might process)
> so
> > restricting them to the same parallelism is a little bit weird don't you
> > think? It kind of goes against the whole opeartors-parallelism idea.
> >
> > I don't think its a huge complexity to group head operators together by
> > parallelism and add a source/sink per each group like we do now. What do
> > you say?
> >
> > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl.
> 31.,
> > P, 17:10):
> >
> > > Yes, I'm not saying that it makes sense to do it, I'm just saying that
> it
> > > does translate and run. Your observation is true. :D
> > >
> > > I'm wondering whether it makes sense to allow users to have iteration
> > heads
> > > with differing parallelism, in fact.
> > >
> > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <gyula.f...@gmail.com> wrote:
> > >
> > > > I still don't get how it could possibly work, let me tell you how I
> see
> > > and
> > > > correct me in my logic :)
> > > >
> > > > You have this program:
> > > > ids.map1().setParallelism(2)
> > > > ids.map2().setParallelism(4)
> > > >
> > > > //...
> > > >
> > > > ids.closeWith(feedback.groupBy(0))
> > > >
> > > > You are suggesting that we only have one iteration source/sink pair
> > with
> > > > parallelism of either 2 or 4. I will assume that the parallelism is 2
> > for
> > > > the sake of the argument.
> > > >
> > > > The iteration source is connected to map1 and map2 with Forward
> > > > partitioning and the sink is connected with groupBy(0).
> > > > Each sink instance will receive all tuples of a given key which also
> > > means
> > > > that each iteration source instance (2) will too.
> > > >
> > > > Now here comes the problem: the source will forward the tuples to
> map 1
> > > and
> > > > since we have forward connection we maintiain the groupby semantics
> > (this
> > > > is perfect.)  the sources will also forward to map 2 which has higher
> > > > parallelism so the tuple sending turns into round robin, which screws
> > up
> > > > the groupby.
> > > >
> > > > What did I miss?
> > > > Gyula
> > > >
> > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl.
> > > 31.,
> > > > P, 14:59):
> > > >
> > > > > Yes, this would still work. For example, I have this crazy graph:
> > > > > http://postimg.org/image/xtv8ay8hv/full/ That results from this
> > > program:
> > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
> > > > >
> > > > > It works, and the implementation is very simple, actually.
> > > > >
> > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <gyula.f...@gmail.com>
> > wrote:
> > > > >
> > > > > > I mean that the head operators have different parallelism:
> > > > > >
> > > > > > IterativeDataStream ids = ...
> > > > > >
> > > > > > ids.map().setParallelism(2)
> > > > > > ids.map().setParallelism(4)
> > > > > >
> > > > > > //...
> > > > > >
> > > > > > ids.closeWith(feedback)
> > > > > >
> > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015.
> > júl.
> > > > > 31.,
> > > > > > P, 14:23):
> > > > > >
> > > > > > > I thought about having some tighter restrictions here. My idea
> > was
> > > to
> > > > > > > enforce that the feedback edges must have the same parallelism
> as
> > > the
> > > > > > > original input stream, otherwise shipping strategies such as
> > > "keyBy",
> > > > > > > "shuffle", "rebalance" don't seem to make sense because they
> > would
> > > > > differ
> > > > > > > from the distribution of the original elements (at least IMHO).
> > > Maybe
> > > > > I'm
> > > > > > > wrong there, though.
> > > > > > >
> > > > > > > To me it seems intuitive that I get the feedback at the head
> they
> > > > way I
> > > > > > > specify it at the tail. But maybe that's also just me... :D
> > > > > > >
> > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <gyf...@apache.org>
> > wrote:
> > > > > > >
> > > > > > > > Hey,
> > > > > > > >
> > > > > > > > I am not sure what is the intuitive behaviour here. As you
> are
> > > not
> > > > > > > applying
> > > > > > > > a transformation on the feedback stream but pass it to a
> > > closeWith
> > > > > > > method,
> > > > > > > > I thought it was somehow nature that it gets the partitioning
> > of
> > > > the
> > > > > > > > iteration input, but maybe its not intuitive.
> > > > > > > >
> > > > > > > > If others also think that preserving feedback partitioning
> > should
> > > > be
> > > > > > the
> > > > > > > > default I am not against it :)
> > > > > > > >
> > > > > > > > Btw, this still won't make it very simple. We still need as
> > many
> > > > > > > > source/sink pairs as we have different parallelism among the
> > head
> > > > > > > > operators. Otherwise the forwarding logic wont work.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Gyula
> > > > > > > >
> > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont:
> > 2015.
> > > > júl.
> > > > > > > 31.,
> > > > > > > > P, 11:52):
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > > I'm currently working on making the StreamGraph generation
> > more
> > > > > > > > centralized
> > > > > > > > > (i.e. not spread across the different API classes). The
> > > question
> > > > is
> > > > > > now
> > > > > > > > why
> > > > > > > > > we need to switch to preserve partitioning? Could we not
> make
> > > > > > > "preserve"
> > > > > > > > > partitioning the default and if users want to have shuffle
> > > > > > partitioning
> > > > > > > > or
> > > > > > > > > anything they have to specify it manually when adding the
> > > > feedback
> > > > > > > edge?
> > > > > > > > >
> > > > > > > > > This would make for a very simple scheme where the
> iteration
> > > > > sources
> > > > > > > are
> > > > > > > > > always connected to the heads using "forward" and the tails
> > are
> > > > > > > connected
> > > > > > > > > to the iteration sinks using whatever partitioner was set
> by
> > > the
> > > > > > user.
> > > > > > > > This
> > > > > > > > > would make it more transparent than the current default of
> > the
> > > > > > > "shuffle"
> > > > > > > > > betweens tails and iteration sinks.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Aljoscha
> > > > > > > > >
> > > > > > > > > P.S. I now we had quite some discussion about introducing
> > > > "preserve
> > > > > > > > > partitioning" but now, when I think of it it should be the
> > > > > default...
> > > > > > > :D
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to