Hi Till,

thanks for the fast answer. I also think this should be the way to go. So
should I open a new jira "Make blocking SpillableSubpartition able to be
read multiple times". Moreover should I mark this jira and FLINK-1713
<https://issues.apache.org/jira/browse/FLINK-1713> as blocking for the
broadcast jira? What do you think?

Best regards,
Felix

2016-08-09 17:41 GMT+07:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Felix,
>
> I'm not sure whether PipelinedSubpartition should be readable more than
> once because then it would effectively mean that we materialize the
> elements of the pipelined subpartition for stragglers. Therefore, I think
> that we should make blocking intermediate results readable more than once.
> This will also be beneficial for interactive programs where we continue
> from the results of previous Flink jobs.
>
> It might also be interesting to have a blocking mode which schedules its
> consumers once the first result is there. Thus, having a mixture of
> pipelined and blocking mode.
>
> Cheers,
> Till
>
> On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <neut...@googlemail.com>
> wrote:
>
> > Hi Stephan,
> >
> > I did some research about blocking intermediate results. It turns out
> that
> > neither PipelinedSubpartition (see line 178) nor blocking intermediate
> > results (see SpillableSubpartition line: 189) can be read multiple times.
> > Moreover blocking intermediate results are currently not supported in
> > native iterations (see https://issues.apache.org/jira/browse/FLINK-1713
> ).
> > So there are three ways to solve this:
> > 1) We extend Pipelined subpartitions to make it possible to read them
> > multiple times
> > 2) We extend Blocking subpartitions to make it possible to read them
> > multiple times, but then we also have to fix FLINK-1713. So we can use
> > broadcasts in native iterations
> > 3) We create one pipelined subpartition for every taskmanager. Problem:
> The
> > more taskmanager there are, the more redundant data we store, but the
> > network traffic stays optimal.
> >
> > Thank you for your help,
> > Felix
> >
> > 2016-08-01 22:51 GMT+07:00 Stephan Ewen <se...@apache.org>:
> >
> > > Hi Felix!
> > >
> > > Hope this helps_
> > >
> > > Concerning (1.1) - The producer does not think in term of number of
> > target
> > > TaskManagers. That number can, after all, change in the presence of a
> > > failure and recovery. The producer should, for its own result, not care
> > how
> > > many consumers it will have (Tasks), but produce it only once.
> > >
> > > Concerning (1.2)  - Only "blocking" intermediate results can be
> consumed
> > > multiple times. Data sent to broadcast variables must thus be always a
> > > blocking intermediate result.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <
> neut...@googlemail.com>
> > > wrote:
> > >
> > > > Hi Stephan,
> > > >
> > > > thanks for the great ideas. First I have some questions:
> > > >
> > > > 1.1) Does every task generate an intermediate result partition for
> > every
> > > > target task or is that already implemented in a way so that there are
> > > only
> > > > as many intermediate result partitions per task manager as target
> > tasks?
> > > > (Example: There are 2 task managers with 2 tasks each. Do we get 4
> > > > intermediate result partitions per task manager or do we get 8?)
> > > > 1.2) How can I consume an intermediate result partition multiple
> times?
> > > > When I tried that I got the following exception:
> > > > Caused by: java.lang.IllegalStateException: Subpartition 0 of
> > > > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is
> > > being
> > > > or already has been consumed, but pipelined subpartitions can only be
> > > > consumed once.
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> > > createReadView(PipelinedSubpartition.java:179)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> > > createReadView(PipelinedSubpartition.java:36)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.ResultPartition.
> > > createSubpartitionView(ResultPartition.java:348)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.
> ResultPartitionManager.
> > > createSubpartitionView(ResultPartitionManager.java:81)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.netty.
> > PartitionRequestServerHandler.
> > > channelRead0(PartitionRequestServerHandler.java:98)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.netty.
> > PartitionRequestServerHandler.
> > > channelRead0(PartitionRequestServerHandler.java:41)
> > > > at
> > > >
> > > > io.netty.channel.SimpleChannelInboundHandler.channelRead(
> > > SimpleChannelInboundHandler.java:105)
> > > >
> > > > My status update: Since Friday I am implementing your idea described
> in
> > > > (2). Locally this approach already works (for less than 170
> > iterations).
> > > I
> > > > will investigate further to solve that issue.
> > > >
> > > > But I am still not sure how to implement (1). Maybe we introduce a
> > > similar
> > > > construct like the BroadcastVariableManager to share the RecordWriter
> > > among
> > > > all tasks of a taskmanager. I am interested in your thoughts :)
> > > >
> > > > Best regards,
> > > > Felix
> > > >
> > > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen <se...@apache.org>:
> > > >
> > > > > Hi Felix!
> > > > >
> > > > > Interesting suggestion. Here are some thoughts on the design.
> > > > >
> > > > > The two core changes needed to send data once to the TaskManagers
> > are:
> > > > >
> > > > >   (1) Every sender needs to produce its stuff once (rather than for
> > > every
> > > > > target task), there should not be redundancy there.
> > > > >   (2) Every TaskManager should request the data once, other tasks
> in
> > > the
> > > > > same TaskManager pick it up from there.
> > > > >
> > > > >
> > > > > The current receiver-initialted pull model is actually a good
> > > abstraction
> > > > > for that, I think.
> > > > >
> > > > > Lets look at (1):
> > > > >
> > > > >   - Currently, the TaskManagers have a separate intermediate result
> > > > > partition for each target slot. They should rather have one
> > > intermediate
> > > > > result partition (saves also repeated serialization) that is
> consumed
> > > > > multiple times.
> > > > >
> > > > >   - Since the results that are to be broadcasted are always
> > "blocking",
> > > > > they can be consumed (pulled)  multiples times.
> > > > >
> > > > > Lets look at (2):
> > > > >
> > > > >   - The current BroadcastVariableManager has the functionality to
> let
> > > the
> > > > > first accessor of the BC-variable materialize the result.
> > > > >
> > > > >   - It could be changed such that only the first accessor creates a
> > > > > RecordReader, so the others do not even request the stream. That
> way,
> > > the
> > > > > TaskManager should pull only one stream from each producing task,
> > which
> > > > > means the data is transferred once.
> > > > >
> > > > >
> > > > > That would also work perfectly with the current failure / recovery
> > > model.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <
> > neut...@googlemail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi everybody,
> > > > > >
> > > > > > I want to improve the performance of broadcasts in Flink.
> Therefore
> > > > Till
> > > > > > told me to start a FLIP on this topic to discuss how to go
> forward
> > to
> > > > > solve
> > > > > > the current issues for broadcasts.
> > > > > >
> > > > > > The problem in a nutshell: Instead of sending data to each
> > > taskmanager
> > > > > only
> > > > > > once, at the moment the data is sent to each task. This means if
> > > there
> > > > > are
> > > > > > 3 slots on each taskmanager we will send the data 3 times instead
> > of
> > > > > once.
> > > > > >
> > > > > > There are multiple ways to tackle this problem and I started to
> do
> > > some
> > > > > > research and investigate. You can follow my thought process here:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > 5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
> > > > > >
> > > > > > This is my first FLIP. So please correct me, if I did something
> > > wrong.
> > > > > >
> > > > > > I am interested in your thoughts about how to solve this issue.
> Do
> > > you
> > > > > > think my approach is heading into the right direction or should
> we
> > > > > follow a
> > > > > > totally different one.
> > > > > >
> > > > > > I am happy about any comment :)
> > > > > >
> > > > > > Best regards,
> > > > > > Felix
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to