Yes, that's what I was proposing in my second mail:

I thought about having some tighter restrictions here. My idea was to
enforce that the feedback edges must have the same parallelism as the
original input stream, otherwise shipping strategies such as "keyBy",
"shuffle", "rebalance" don't seem to make sense because they would differ
from the distribution of the original elements (at least IMHO). Maybe I'm
wrong there, though.

To me it seems intuitive that I get the feedback at the head they way I
specify it at the tail. But maybe that's also just me... :D

On Mon, 3 Aug 2015 at 00:14 Stephan Ewen <se...@apache.org> wrote:

> This model strikes me as pretty complicated. Imagine the extra logic and
> code path necessary for proper checkpointing as well.
>
> Why not do a simple approach:
>   - There is one parallel head, one parallel tail, both with the same
> parallelism
>
>   - Any computation in between may have it own parallelism, no special
> cases
>
>   - If the tail does not have the same parallelism as the head, it will not
> by the tail, but flow will attach an additional tail operator. Between the
> original tail and the additional tail, the streams are redistributed to
> achieve the required parallelism.
>
> Wouldn't that give us the same and make things much easier. The batch
> iterations work that way, by the way.
>
>
>
> On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > To answer the question plain and simple: No, there are several different
> > parallel heads and tails.
> >
> > For example in this:
> > val iter = ds.iteration()
> >
> > val head_tail1 = iter.map().parallelism(2)
> > val head_tail2 = iter.map().parallelism(4)
> >
> > iter.closeWith(head_tail1.union(head_tail2))
> >
> > We have one head/tail pair with parallelism 2 and on with parallelism 4.
> >
> > Of the top of my head, I don't know what happens in this case though:
> >
> > val iter = ds.iteration()
> >
> > val head1 = iter.map().parallelism(2)
> > val head2 = iter.map().parallelism(4)
> >
> > val tail1 = head1.map().parallelism(6)
> > val tail2 = head2.map().parallelism(8)
> >
> > iter.closeWith(tail1.union(tail2))
> >
> > (Which is also tricky with the parallelism of the input stream)
> >
> >
> > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra <gyula.f...@gmail.com> wrote:
> >
> > > In a streaming program when we create an IterativeDataStream, we
> > > practically mark the union point of some later feedback stream (the one
> > > passed in to closeWith(..)).
> > >
> > > The operators applied on this IterativeDataStream will receive the
> > feedback
> > > input as well. We call the operators applied on the iterative
> dataStream
> > > head operators. We call the operators that produce the streams passed
> > into
> > > closeWith tail operators. With this terminology we can have many heads
> > and
> > > tails with varying parallelism.
> > >
> > > Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. aug. 2., V,
> > > 20:16):
> > >
> > > > I don't get the discussion here, can you help me with what you mean
> by
> > > > "different iteration heads and tails" ?
> > > >
> > > > An iteration does not have one parallel head and one parallel tail?
> > > >
> > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <gyula.f...@gmail.com>
> > > wrote:
> > > >
> > > > > Maybe you can reuse some of the logic that is currently there on
> the
> > > > > StreamGraph, with building StreamLoops first which will be used to
> > > > generate
> > > > > the sources and sinks right before building the JobGraph. This
> avoids
> > > the
> > > > > need of knowing everything beforehand.
> > > > >
> > > > > I actually added this to avoid the complexities that you are
> probably
> > > > > facing now.
> > > > >
> > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015.
> júl.
> > > > 31.,
> > > > > P, 17:28):
> > > > >
> > > > > > Sure it can be done, it's just more complex if you try to do it
> in
> > a
> > > > sane
> > > > > > way without having the code that builds the StreamGraph all over
> > the
> > > > > place.
> > > > > > :D
> > > > > >
> > > > > > I'll try to come up with something. This is my current work in
> > > > progress,
> > > > > by
> > > > > > the way:
> https://github.com/aljoscha/flink/tree/stream-api-rework
> > > > > >
> > > > > > I managed to ban the StreamGraph from StreamExecutionEnvironment
> > and
> > > > the
> > > > > > API classes such as DataStream. The API methods construct a Graph
> > of
> > > > > > Transformation Nodes and don't contain any information
> themselves.
> > > Then
> > > > > > there is a StreamGraphGenerator that builds a StreamGraph from
> the
> > > > > > transformations. The abstraction is very nice and simple, the
> only
> > > > > problem
> > > > > > that remains are the differing-parallelism-iterations but I'll
> > figure
> > > > > them
> > > > > > out.
> > > > > >
> > > > > > P.S. The code is not well documented yet, but the base class for
> > > > > > transformations is StreamTransformation. From there anyone who
> > want's
> > > > to
> > > > > > check it out can find the other transformations.
> > > > > >
> > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <gyula.f...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > There might be reasons why a user would want different
> > parallelism
> > > at
> > > > > the
> > > > > > > head operators (depending on what else that head operator might
> > > > > process)
> > > > > > so
> > > > > > > restricting them to the same parallelism is a little bit weird
> > > don't
> > > > > you
> > > > > > > think? It kind of goes against the whole opeartors-parallelism
> > > idea.
> > > > > > >
> > > > > > > I don't think its a huge complexity to group head operators
> > > together
> > > > by
> > > > > > > parallelism and add a source/sink per each group like we do
> now.
> > > What
> > > > > do
> > > > > > > you say?
> > > > > > >
> > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont:
> 2015.
> > > júl.
> > > > > > 31.,
> > > > > > > P, 17:10):
> > > > > > >
> > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm just
> > saying
> > > > > that
> > > > > > it
> > > > > > > > does translate and run. Your observation is true. :D
> > > > > > > >
> > > > > > > > I'm wondering whether it makes sense to allow users to have
> > > > iteration
> > > > > > > heads
> > > > > > > > with differing parallelism, in fact.
> > > > > > > >
> > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <
> gyula.f...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > I still don't get how it could possibly work, let me tell
> you
> > > > how I
> > > > > > see
> > > > > > > > and
> > > > > > > > > correct me in my logic :)
> > > > > > > > >
> > > > > > > > > You have this program:
> > > > > > > > > ids.map1().setParallelism(2)
> > > > > > > > > ids.map2().setParallelism(4)
> > > > > > > > >
> > > > > > > > > //...
> > > > > > > > >
> > > > > > > > > ids.closeWith(feedback.groupBy(0))
> > > > > > > > >
> > > > > > > > > You are suggesting that we only have one iteration
> > source/sink
> > > > pair
> > > > > > > with
> > > > > > > > > parallelism of either 2 or 4. I will assume that the
> > > parallelism
> > > > > is 2
> > > > > > > for
> > > > > > > > > the sake of the argument.
> > > > > > > > >
> > > > > > > > > The iteration source is connected to map1 and map2 with
> > Forward
> > > > > > > > > partitioning and the sink is connected with groupBy(0).
> > > > > > > > > Each sink instance will receive all tuples of a given key
> > which
> > > > > also
> > > > > > > > means
> > > > > > > > > that each iteration source instance (2) will too.
> > > > > > > > >
> > > > > > > > > Now here comes the problem: the source will forward the
> > tuples
> > > to
> > > > > > map 1
> > > > > > > > and
> > > > > > > > > since we have forward connection we maintiain the groupby
> > > > semantics
> > > > > > > (this
> > > > > > > > > is perfect.)  the sources will also forward to map 2 which
> > has
> > > > > higher
> > > > > > > > > parallelism so the tuple sending turns into round robin,
> > which
> > > > > screws
> > > > > > > up
> > > > > > > > > the groupby.
> > > > > > > > >
> > > > > > > > > What did I miss?
> > > > > > > > > Gyula
> > > > > > > > >
> > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont:
> > > 2015.
> > > > > júl.
> > > > > > > > 31.,
> > > > > > > > > P, 14:59):
> > > > > > > > >
> > > > > > > > > > Yes, this would still work. For example, I have this
> crazy
> > > > graph:
> > > > > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That results
> from
> > > > this
> > > > > > > > program:
> > > > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
> > > > > > > > > >
> > > > > > > > > > It works, and the implementation is very simple,
> actually.
> > > > > > > > > >
> > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <
> > > gyula.f...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > I mean that the head operators have different
> > parallelism:
> > > > > > > > > > >
> > > > > > > > > > > IterativeDataStream ids = ...
> > > > > > > > > > >
> > > > > > > > > > > ids.map().setParallelism(2)
> > > > > > > > > > > ids.map().setParallelism(4)
> > > > > > > > > > >
> > > > > > > > > > > //...
> > > > > > > > > > >
> > > > > > > > > > > ids.closeWith(feedback)
> > > > > > > > > > >
> > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta
> > (időpont:
> > > > > 2015.
> > > > > > > júl.
> > > > > > > > > > 31.,
> > > > > > > > > > > P, 14:23):
> > > > > > > > > > >
> > > > > > > > > > > > I thought about having some tighter restrictions
> here.
> > My
> > > > > idea
> > > > > > > was
> > > > > > > > to
> > > > > > > > > > > > enforce that the feedback edges must have the same
> > > > > parallelism
> > > > > > as
> > > > > > > > the
> > > > > > > > > > > > original input stream, otherwise shipping strategies
> > such
> > > > as
> > > > > > > > "keyBy",
> > > > > > > > > > > > "shuffle", "rebalance" don't seem to make sense
> because
> > > > they
> > > > > > > would
> > > > > > > > > > differ
> > > > > > > > > > > > from the distribution of the original elements (at
> > least
> > > > > IMHO).
> > > > > > > > Maybe
> > > > > > > > > > I'm
> > > > > > > > > > > > wrong there, though.
> > > > > > > > > > > >
> > > > > > > > > > > > To me it seems intuitive that I get the feedback at
> the
> > > > head
> > > > > > they
> > > > > > > > > way I
> > > > > > > > > > > > specify it at the tail. But maybe that's also just
> > me...
> > > :D
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <
> > > gyf...@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hey,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I am not sure what is the intuitive behaviour here.
> > As
> > > > you
> > > > > > are
> > > > > > > > not
> > > > > > > > > > > > applying
> > > > > > > > > > > > > a transformation on the feedback stream but pass it
> > to
> > > a
> > > > > > > > closeWith
> > > > > > > > > > > > method,
> > > > > > > > > > > > > I thought it was somehow nature that it gets the
> > > > > partitioning
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > iteration input, but maybe its not intuitive.
> > > > > > > > > > > > >
> > > > > > > > > > > > > If others also think that preserving feedback
> > > > partitioning
> > > > > > > should
> > > > > > > > > be
> > > > > > > > > > > the
> > > > > > > > > > > > > default I am not against it :)
> > > > > > > > > > > > >
> > > > > > > > > > > > > Btw, this still won't make it very simple. We still
> > > need
> > > > as
> > > > > > > many
> > > > > > > > > > > > > source/sink pairs as we have different parallelism
> > > among
> > > > > the
> > > > > > > head
> > > > > > > > > > > > > operators. Otherwise the forwarding logic wont
> work.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > Gyula
> > > > > > > > > > > > >
> > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta
> > > > (időpont:
> > > > > > > 2015.
> > > > > > > > > júl.
> > > > > > > > > > > > 31.,
> > > > > > > > > > > > > P, 11:52):
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > I'm currently working on making the StreamGraph
> > > > > generation
> > > > > > > more
> > > > > > > > > > > > > centralized
> > > > > > > > > > > > > > (i.e. not spread across the different API
> classes).
> > > The
> > > > > > > > question
> > > > > > > > > is
> > > > > > > > > > > now
> > > > > > > > > > > > > why
> > > > > > > > > > > > > > we need to switch to preserve partitioning? Could
> > we
> > > > not
> > > > > > make
> > > > > > > > > > > > "preserve"
> > > > > > > > > > > > > > partitioning the default and if users want to
> have
> > > > > shuffle
> > > > > > > > > > > partitioning
> > > > > > > > > > > > > or
> > > > > > > > > > > > > > anything they have to specify it manually when
> > adding
> > > > the
> > > > > > > > > feedback
> > > > > > > > > > > > edge?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > This would make for a very simple scheme where
> the
> > > > > > iteration
> > > > > > > > > > sources
> > > > > > > > > > > > are
> > > > > > > > > > > > > > always connected to the heads using "forward" and
> > the
> > > > > tails
> > > > > > > are
> > > > > > > > > > > > connected
> > > > > > > > > > > > > > to the iteration sinks using whatever partitioner
> > was
> > > > set
> > > > > > by
> > > > > > > > the
> > > > > > > > > > > user.
> > > > > > > > > > > > > This
> > > > > > > > > > > > > > would make it more transparent than the current
> > > default
> > > > > of
> > > > > > > the
> > > > > > > > > > > > "shuffle"
> > > > > > > > > > > > > > betweens tails and iteration sinks.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > Aljoscha
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > P.S. I now we had quite some discussion about
> > > > introducing
> > > > > > > > > "preserve
> > > > > > > > > > > > > > partitioning" but now, when I think of it it
> should
> > > be
> > > > > the
> > > > > > > > > > default...
> > > > > > > > > > > > :D
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to