Hi Felix,

I'm not sure whether PipelinedSubpartition should be readable more than
once because then it would effectively mean that we materialize the
elements of the pipelined subpartition for stragglers. Therefore, I think
that we should make blocking intermediate results readable more than once.
This will also be beneficial for interactive programs where we continue
from the results of previous Flink jobs.

It might also be interesting to have a blocking mode which schedules its
consumers once the first result is there. Thus, having a mixture of
pipelined and blocking mode.

Cheers,
Till

On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <neut...@googlemail.com>
wrote:

> Hi Stephan,
>
> I did some research about blocking intermediate results. It turns out that
> neither PipelinedSubpartition (see line 178) nor blocking intermediate
> results (see SpillableSubpartition line: 189) can be read multiple times.
> Moreover blocking intermediate results are currently not supported in
> native iterations (see https://issues.apache.org/jira/browse/FLINK-1713 ).
> So there are three ways to solve this:
> 1) We extend Pipelined subpartitions to make it possible to read them
> multiple times
> 2) We extend Blocking subpartitions to make it possible to read them
> multiple times, but then we also have to fix FLINK-1713. So we can use
> broadcasts in native iterations
> 3) We create one pipelined subpartition for every taskmanager. Problem: The
> more taskmanager there are, the more redundant data we store, but the
> network traffic stays optimal.
>
> Thank you for your help,
> Felix
>
> 2016-08-01 22:51 GMT+07:00 Stephan Ewen <se...@apache.org>:
>
> > Hi Felix!
> >
> > Hope this helps_
> >
> > Concerning (1.1) - The producer does not think in term of number of
> target
> > TaskManagers. That number can, after all, change in the presence of a
> > failure and recovery. The producer should, for its own result, not care
> how
> > many consumers it will have (Tasks), but produce it only once.
> >
> > Concerning (1.2)  - Only "blocking" intermediate results can be consumed
> > multiple times. Data sent to broadcast variables must thus be always a
> > blocking intermediate result.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <neut...@googlemail.com>
> > wrote:
> >
> > > Hi Stephan,
> > >
> > > thanks for the great ideas. First I have some questions:
> > >
> > > 1.1) Does every task generate an intermediate result partition for
> every
> > > target task or is that already implemented in a way so that there are
> > only
> > > as many intermediate result partitions per task manager as target
> tasks?
> > > (Example: There are 2 task managers with 2 tasks each. Do we get 4
> > > intermediate result partitions per task manager or do we get 8?)
> > > 1.2) How can I consume an intermediate result partition multiple times?
> > > When I tried that I got the following exception:
> > > Caused by: java.lang.IllegalStateException: Subpartition 0 of
> > > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is
> > being
> > > or already has been consumed, but pipelined subpartitions can only be
> > > consumed once.
> > > at
> > >
> > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> > createReadView(PipelinedSubpartition.java:179)
> > > at
> > >
> > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> > createReadView(PipelinedSubpartition.java:36)
> > > at
> > >
> > > org.apache.flink.runtime.io.network.partition.ResultPartition.
> > createSubpartitionView(ResultPartition.java:348)
> > > at
> > >
> > > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.
> > createSubpartitionView(ResultPartitionManager.java:81)
> > > at
> > >
> > > org.apache.flink.runtime.io.network.netty.
> PartitionRequestServerHandler.
> > channelRead0(PartitionRequestServerHandler.java:98)
> > > at
> > >
> > > org.apache.flink.runtime.io.network.netty.
> PartitionRequestServerHandler.
> > channelRead0(PartitionRequestServerHandler.java:41)
> > > at
> > >
> > > io.netty.channel.SimpleChannelInboundHandler.channelRead(
> > SimpleChannelInboundHandler.java:105)
> > >
> > > My status update: Since Friday I am implementing your idea described in
> > > (2). Locally this approach already works (for less than 170
> iterations).
> > I
> > > will investigate further to solve that issue.
> > >
> > > But I am still not sure how to implement (1). Maybe we introduce a
> > similar
> > > construct like the BroadcastVariableManager to share the RecordWriter
> > among
> > > all tasks of a taskmanager. I am interested in your thoughts :)
> > >
> > > Best regards,
> > > Felix
> > >
> > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen <se...@apache.org>:
> > >
> > > > Hi Felix!
> > > >
> > > > Interesting suggestion. Here are some thoughts on the design.
> > > >
> > > > The two core changes needed to send data once to the TaskManagers
> are:
> > > >
> > > >   (1) Every sender needs to produce its stuff once (rather than for
> > every
> > > > target task), there should not be redundancy there.
> > > >   (2) Every TaskManager should request the data once, other tasks in
> > the
> > > > same TaskManager pick it up from there.
> > > >
> > > >
> > > > The current receiver-initialted pull model is actually a good
> > abstraction
> > > > for that, I think.
> > > >
> > > > Lets look at (1):
> > > >
> > > >   - Currently, the TaskManagers have a separate intermediate result
> > > > partition for each target slot. They should rather have one
> > intermediate
> > > > result partition (saves also repeated serialization) that is consumed
> > > > multiple times.
> > > >
> > > >   - Since the results that are to be broadcasted are always
> "blocking",
> > > > they can be consumed (pulled)  multiples times.
> > > >
> > > > Lets look at (2):
> > > >
> > > >   - The current BroadcastVariableManager has the functionality to let
> > the
> > > > first accessor of the BC-variable materialize the result.
> > > >
> > > >   - It could be changed such that only the first accessor creates a
> > > > RecordReader, so the others do not even request the stream. That way,
> > the
> > > > TaskManager should pull only one stream from each producing task,
> which
> > > > means the data is transferred once.
> > > >
> > > >
> > > > That would also work perfectly with the current failure / recovery
> > model.
> > > >
> > > > What do you think?
> > > >
> > > > Stephan
> > > >
> > > >
> > > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <
> neut...@googlemail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi everybody,
> > > > >
> > > > > I want to improve the performance of broadcasts in Flink. Therefore
> > > Till
> > > > > told me to start a FLIP on this topic to discuss how to go forward
> to
> > > > solve
> > > > > the current issues for broadcasts.
> > > > >
> > > > > The problem in a nutshell: Instead of sending data to each
> > taskmanager
> > > > only
> > > > > once, at the moment the data is sent to each task. This means if
> > there
> > > > are
> > > > > 3 slots on each taskmanager we will send the data 3 times instead
> of
> > > > once.
> > > > >
> > > > > There are multiple ways to tackle this problem and I started to do
> > some
> > > > > research and investigate. You can follow my thought process here:
> > > > >
> > > > >
> > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > 5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
> > > > >
> > > > > This is my first FLIP. So please correct me, if I did something
> > wrong.
> > > > >
> > > > > I am interested in your thoughts about how to solve this issue. Do
> > you
> > > > > think my approach is heading into the right direction or should we
> > > > follow a
> > > > > totally different one.
> > > > >
> > > > > I am happy about any comment :)
> > > > >
> > > > > Best regards,
> > > > > Felix
> > > > >
> > > >
> > >
> >
>

Reply via email to