Hi Felix,

if we cannot work around the problem with blocking intermediate results in
iterations, then we have to make FLINK-1713 a blocker for this new issue.
But maybe you can also keep the current broadcasting mechanism to be used
within iterations only. Then we can address the iteration problem later.

Cheers,
Till

On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz <neut...@googlemail.com>
wrote:

> Hi Till,
>
> thanks for the fast answer. I also think this should be the way to go. So
> should I open a new jira "Make blocking SpillableSubpartition able to be
> read multiple times". Moreover should I mark this jira and FLINK-1713
> <https://issues.apache.org/jira/browse/FLINK-1713> as blocking for the
> broadcast jira? What do you think?
>
> Best regards,
> Felix
>
> 2016-08-09 17:41 GMT+07:00 Till Rohrmann <trohrm...@apache.org>:
>
> > Hi Felix,
> >
> > I'm not sure whether PipelinedSubpartition should be readable more than
> > once because then it would effectively mean that we materialize the
> > elements of the pipelined subpartition for stragglers. Therefore, I think
> > that we should make blocking intermediate results readable more than
> once.
> > This will also be beneficial for interactive programs where we continue
> > from the results of previous Flink jobs.
> >
> > It might also be interesting to have a blocking mode which schedules its
> > consumers once the first result is there. Thus, having a mixture of
> > pipelined and blocking mode.
> >
> > Cheers,
> > Till
> >
> > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <neut...@googlemail.com>
> > wrote:
> >
> > > Hi Stephan,
> > >
> > > I did some research about blocking intermediate results. It turns out
> > that
> > > neither PipelinedSubpartition (see line 178) nor blocking intermediate
> > > results (see SpillableSubpartition line: 189) can be read multiple
> times.
> > > Moreover blocking intermediate results are currently not supported in
> > > native iterations (see https://issues.apache.org/
> jira/browse/FLINK-1713
> > ).
> > > So there are three ways to solve this:
> > > 1) We extend Pipelined subpartitions to make it possible to read them
> > > multiple times
> > > 2) We extend Blocking subpartitions to make it possible to read them
> > > multiple times, but then we also have to fix FLINK-1713. So we can use
> > > broadcasts in native iterations
> > > 3) We create one pipelined subpartition for every taskmanager. Problem:
> > The
> > > more taskmanager there are, the more redundant data we store, but the
> > > network traffic stays optimal.
> > >
> > > Thank you for your help,
> > > Felix
> > >
> > > 2016-08-01 22:51 GMT+07:00 Stephan Ewen <se...@apache.org>:
> > >
> > > > Hi Felix!
> > > >
> > > > Hope this helps_
> > > >
> > > > Concerning (1.1) - The producer does not think in term of number of
> > > target
> > > > TaskManagers. That number can, after all, change in the presence of a
> > > > failure and recovery. The producer should, for its own result, not
> care
> > > how
> > > > many consumers it will have (Tasks), but produce it only once.
> > > >
> > > > Concerning (1.2)  - Only "blocking" intermediate results can be
> > consumed
> > > > multiple times. Data sent to broadcast variables must thus be always
> a
> > > > blocking intermediate result.
> > > >
> > > > Greetings,
> > > > Stephan
> > > >
> > > >
> > > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <
> > neut...@googlemail.com>
> > > > wrote:
> > > >
> > > > > Hi Stephan,
> > > > >
> > > > > thanks for the great ideas. First I have some questions:
> > > > >
> > > > > 1.1) Does every task generate an intermediate result partition for
> > > every
> > > > > target task or is that already implemented in a way so that there
> are
> > > > only
> > > > > as many intermediate result partitions per task manager as target
> > > tasks?
> > > > > (Example: There are 2 task managers with 2 tasks each. Do we get 4
> > > > > intermediate result partitions per task manager or do we get 8?)
> > > > > 1.2) How can I consume an intermediate result partition multiple
> > times?
> > > > > When I tried that I got the following exception:
> > > > > Caused by: java.lang.IllegalStateException: Subpartition 0 of
> > > > > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf
> is
> > > > being
> > > > > or already has been consumed, but pipelined subpartitions can only
> be
> > > > > consumed once.
> > > > > at
> > > > >
> > > > > org.apache.flink.runtime.io.network.partition.
> PipelinedSubpartition.
> > > > createReadView(PipelinedSubpartition.java:179)
> > > > > at
> > > > >
> > > > > org.apache.flink.runtime.io.network.partition.
> PipelinedSubpartition.
> > > > createReadView(PipelinedSubpartition.java:36)
> > > > > at
> > > > >
> > > > > org.apache.flink.runtime.io.network.partition.ResultPartition.
> > > > createSubpartitionView(ResultPartition.java:348)
> > > > > at
> > > > >
> > > > > org.apache.flink.runtime.io.network.partition.
> > ResultPartitionManager.
> > > > createSubpartitionView(ResultPartitionManager.java:81)
> > > > > at
> > > > >
> > > > > org.apache.flink.runtime.io.network.netty.
> > > PartitionRequestServerHandler.
> > > > channelRead0(PartitionRequestServerHandler.java:98)
> > > > > at
> > > > >
> > > > > org.apache.flink.runtime.io.network.netty.
> > > PartitionRequestServerHandler.
> > > > channelRead0(PartitionRequestServerHandler.java:41)
> > > > > at
> > > > >
> > > > > io.netty.channel.SimpleChannelInboundHandler.channelRead(
> > > > SimpleChannelInboundHandler.java:105)
> > > > >
> > > > > My status update: Since Friday I am implementing your idea
> described
> > in
> > > > > (2). Locally this approach already works (for less than 170
> > > iterations).
> > > > I
> > > > > will investigate further to solve that issue.
> > > > >
> > > > > But I am still not sure how to implement (1). Maybe we introduce a
> > > > similar
> > > > > construct like the BroadcastVariableManager to share the
> RecordWriter
> > > > among
> > > > > all tasks of a taskmanager. I am interested in your thoughts :)
> > > > >
> > > > > Best regards,
> > > > > Felix
> > > > >
> > > > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen <se...@apache.org>:
> > > > >
> > > > > > Hi Felix!
> > > > > >
> > > > > > Interesting suggestion. Here are some thoughts on the design.
> > > > > >
> > > > > > The two core changes needed to send data once to the TaskManagers
> > > are:
> > > > > >
> > > > > >   (1) Every sender needs to produce its stuff once (rather than
> for
> > > > every
> > > > > > target task), there should not be redundancy there.
> > > > > >   (2) Every TaskManager should request the data once, other tasks
> > in
> > > > the
> > > > > > same TaskManager pick it up from there.
> > > > > >
> > > > > >
> > > > > > The current receiver-initialted pull model is actually a good
> > > > abstraction
> > > > > > for that, I think.
> > > > > >
> > > > > > Lets look at (1):
> > > > > >
> > > > > >   - Currently, the TaskManagers have a separate intermediate
> result
> > > > > > partition for each target slot. They should rather have one
> > > > intermediate
> > > > > > result partition (saves also repeated serialization) that is
> > consumed
> > > > > > multiple times.
> > > > > >
> > > > > >   - Since the results that are to be broadcasted are always
> > > "blocking",
> > > > > > they can be consumed (pulled)  multiples times.
> > > > > >
> > > > > > Lets look at (2):
> > > > > >
> > > > > >   - The current BroadcastVariableManager has the functionality to
> > let
> > > > the
> > > > > > first accessor of the BC-variable materialize the result.
> > > > > >
> > > > > >   - It could be changed such that only the first accessor
> creates a
> > > > > > RecordReader, so the others do not even request the stream. That
> > way,
> > > > the
> > > > > > TaskManager should pull only one stream from each producing task,
> > > which
> > > > > > means the data is transferred once.
> > > > > >
> > > > > >
> > > > > > That would also work perfectly with the current failure /
> recovery
> > > > model.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Stephan
> > > > > >
> > > > > >
> > > > > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <
> > > neut...@googlemail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi everybody,
> > > > > > >
> > > > > > > I want to improve the performance of broadcasts in Flink.
> > Therefore
> > > > > Till
> > > > > > > told me to start a FLIP on this topic to discuss how to go
> > forward
> > > to
> > > > > > solve
> > > > > > > the current issues for broadcasts.
> > > > > > >
> > > > > > > The problem in a nutshell: Instead of sending data to each
> > > > taskmanager
> > > > > > only
> > > > > > > once, at the moment the data is sent to each task. This means
> if
> > > > there
> > > > > > are
> > > > > > > 3 slots on each taskmanager we will send the data 3 times
> instead
> > > of
> > > > > > once.
> > > > > > >
> > > > > > > There are multiple ways to tackle this problem and I started to
> > do
> > > > some
> > > > > > > research and investigate. You can follow my thought process
> here:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > 5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
> > > > > > >
> > > > > > > This is my first FLIP. So please correct me, if I did something
> > > > wrong.
> > > > > > >
> > > > > > > I am interested in your thoughts about how to solve this issue.
> > Do
> > > > you
> > > > > > > think my approach is heading into the right direction or should
> > we
> > > > > > follow a
> > > > > > > totally different one.
> > > > > > >
> > > > > > > I am happy about any comment :)
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Felix
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to