Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-07-22 Thread Stephan Ewen
Hi Felix!

Interesting suggestion. Here are some thoughts on the design.

The two core changes needed to send data once to the TaskManagers are:

  (1) Every sender needs to produce its stuff once (rather than for every
target task), there should not be redundancy there.
  (2) Every TaskManager should request the data once, other tasks in the
same TaskManager pick it up from there.


The current receiver-initialted pull model is actually a good abstraction
for that, I think.

Lets look at (1):

  - Currently, the TaskManagers have a separate intermediate result
partition for each target slot. They should rather have one intermediate
result partition (saves also repeated serialization) that is consumed
multiple times.

  - Since the results that are to be broadcasted are always "blocking",
they can be consumed (pulled)  multiples times.

Lets look at (2):

  - The current BroadcastVariableManager has the functionality to let the
first accessor of the BC-variable materialize the result.

  - It could be changed such that only the first accessor creates a
RecordReader, so the others do not even request the stream. That way, the
TaskManager should pull only one stream from each producing task, which
means the data is transferred once.


That would also work perfectly with the current failure / recovery model.

What do you think?

Stephan


On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz 
wrote:

> Hi everybody,
>
> I want to improve the performance of broadcasts in Flink. Therefore Till
> told me to start a FLIP on this topic to discuss how to go forward to solve
> the current issues for broadcasts.
>
> The problem in a nutshell: Instead of sending data to each taskmanager only
> once, at the moment the data is sent to each task. This means if there are
> 3 slots on each taskmanager we will send the data 3 times instead of once.
>
> There are multiple ways to tackle this problem and I started to do some
> research and investigate. You can follow my thought process here:
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
>
> This is my first FLIP. So please correct me, if I did something wrong.
>
> I am interested in your thoughts about how to solve this issue. Do you
> think my approach is heading into the right direction or should we follow a
> totally different one.
>
> I am happy about any comment :)
>
> Best regards,
> Felix
>


Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-07-27 Thread Felix Neutatz
Hi Stephan,

thanks for the great ideas. First I have some questions:

1.1) Does every task generate an intermediate result partition for every
target task or is that already implemented in a way so that there are only
as many intermediate result partitions per task manager as target tasks?
(Example: There are 2 task managers with 2 tasks each. Do we get 4
intermediate result partitions per task manager or do we get 8?)
1.2) How can I consume an intermediate result partition multiple times?
When I tried that I got the following exception:
Caused by: java.lang.IllegalStateException: Subpartition 0 of
dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is being
or already has been consumed, but pipelined subpartitions can only be
consumed once.
at
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:179)
at
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:36)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:348)
at
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:81)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:98)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:41)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)

My status update: Since Friday I am implementing your idea described in
(2). Locally this approach already works (for less than 170 iterations). I
will investigate further to solve that issue.

But I am still not sure how to implement (1). Maybe we introduce a similar
construct like the BroadcastVariableManager to share the RecordWriter among
all tasks of a taskmanager. I am interested in your thoughts :)

Best regards,
Felix

2016-07-22 17:25 GMT+02:00 Stephan Ewen :

> Hi Felix!
>
> Interesting suggestion. Here are some thoughts on the design.
>
> The two core changes needed to send data once to the TaskManagers are:
>
>   (1) Every sender needs to produce its stuff once (rather than for every
> target task), there should not be redundancy there.
>   (2) Every TaskManager should request the data once, other tasks in the
> same TaskManager pick it up from there.
>
>
> The current receiver-initialted pull model is actually a good abstraction
> for that, I think.
>
> Lets look at (1):
>
>   - Currently, the TaskManagers have a separate intermediate result
> partition for each target slot. They should rather have one intermediate
> result partition (saves also repeated serialization) that is consumed
> multiple times.
>
>   - Since the results that are to be broadcasted are always "blocking",
> they can be consumed (pulled)  multiples times.
>
> Lets look at (2):
>
>   - The current BroadcastVariableManager has the functionality to let the
> first accessor of the BC-variable materialize the result.
>
>   - It could be changed such that only the first accessor creates a
> RecordReader, so the others do not even request the stream. That way, the
> TaskManager should pull only one stream from each producing task, which
> means the data is transferred once.
>
>
> That would also work perfectly with the current failure / recovery model.
>
> What do you think?
>
> Stephan
>
>
> On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz 
> wrote:
>
> > Hi everybody,
> >
> > I want to improve the performance of broadcasts in Flink. Therefore Till
> > told me to start a FLIP on this topic to discuss how to go forward to
> solve
> > the current issues for broadcasts.
> >
> > The problem in a nutshell: Instead of sending data to each taskmanager
> only
> > once, at the moment the data is sent to each task. This means if there
> are
> > 3 slots on each taskmanager we will send the data 3 times instead of
> once.
> >
> > There are multiple ways to tackle this problem and I started to do some
> > research and investigate. You can follow my thought process here:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
> >
> > This is my first FLIP. So please correct me, if I did something wrong.
> >
> > I am interested in your thoughts about how to solve this issue. Do you
> > think my approach is heading into the right direction or should we
> follow a
> > totally different one.
> >
> > I am happy about any comment :)
> >
> > Best regards,
> > Felix
> >
>


Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-01 Thread Stephan Ewen
Hi Felix!

Hope this helps_

Concerning (1.1) - The producer does not think in term of number of target
TaskManagers. That number can, after all, change in the presence of a
failure and recovery. The producer should, for its own result, not care how
many consumers it will have (Tasks), but produce it only once.

Concerning (1.2)  - Only "blocking" intermediate results can be consumed
multiple times. Data sent to broadcast variables must thus be always a
blocking intermediate result.

Greetings,
Stephan


On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz 
wrote:

> Hi Stephan,
>
> thanks for the great ideas. First I have some questions:
>
> 1.1) Does every task generate an intermediate result partition for every
> target task or is that already implemented in a way so that there are only
> as many intermediate result partitions per task manager as target tasks?
> (Example: There are 2 task managers with 2 tasks each. Do we get 4
> intermediate result partitions per task manager or do we get 8?)
> 1.2) How can I consume an intermediate result partition multiple times?
> When I tried that I got the following exception:
> Caused by: java.lang.IllegalStateException: Subpartition 0 of
> dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is being
> or already has been consumed, but pipelined subpartitions can only be
> consumed once.
> at
>
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:179)
> at
>
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:36)
> at
>
> org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:348)
> at
>
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:81)
> at
>
> org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:98)
> at
>
> org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:41)
> at
>
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>
> My status update: Since Friday I am implementing your idea described in
> (2). Locally this approach already works (for less than 170 iterations). I
> will investigate further to solve that issue.
>
> But I am still not sure how to implement (1). Maybe we introduce a similar
> construct like the BroadcastVariableManager to share the RecordWriter among
> all tasks of a taskmanager. I am interested in your thoughts :)
>
> Best regards,
> Felix
>
> 2016-07-22 17:25 GMT+02:00 Stephan Ewen :
>
> > Hi Felix!
> >
> > Interesting suggestion. Here are some thoughts on the design.
> >
> > The two core changes needed to send data once to the TaskManagers are:
> >
> >   (1) Every sender needs to produce its stuff once (rather than for every
> > target task), there should not be redundancy there.
> >   (2) Every TaskManager should request the data once, other tasks in the
> > same TaskManager pick it up from there.
> >
> >
> > The current receiver-initialted pull model is actually a good abstraction
> > for that, I think.
> >
> > Lets look at (1):
> >
> >   - Currently, the TaskManagers have a separate intermediate result
> > partition for each target slot. They should rather have one intermediate
> > result partition (saves also repeated serialization) that is consumed
> > multiple times.
> >
> >   - Since the results that are to be broadcasted are always "blocking",
> > they can be consumed (pulled)  multiples times.
> >
> > Lets look at (2):
> >
> >   - The current BroadcastVariableManager has the functionality to let the
> > first accessor of the BC-variable materialize the result.
> >
> >   - It could be changed such that only the first accessor creates a
> > RecordReader, so the others do not even request the stream. That way, the
> > TaskManager should pull only one stream from each producing task, which
> > means the data is transferred once.
> >
> >
> > That would also work perfectly with the current failure / recovery model.
> >
> > What do you think?
> >
> > Stephan
> >
> >
> > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz 
> > wrote:
> >
> > > Hi everybody,
> > >
> > > I want to improve the performance of broadcasts in Flink. Therefore
> Till
> > > told me to start a FLIP on this topic to discuss how to go forward to
> > solve
> > > the current issues for broadcasts.
> > >
> > > The problem in a nutshell: Instead of sending data to each taskmanager
> > only
> > > once, at the moment the data is sent to each task. This means if there
> > are
> > > 3 slots on each taskmanager we will send the data 3 times instead of
> > once.
> > >
> > > There are multiple ways to tackle this problem and I started to do some
> > > research and investigate. You can follow my thought process here:
> > >
> > >
> > >
> >
> https://cwiki

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-08 Thread Felix Neutatz
Hi Stephan,

I did some research about blocking intermediate results. It turns out that
neither PipelinedSubpartition (see line 178) nor blocking intermediate
results (see SpillableSubpartition line: 189) can be read multiple times.
Moreover blocking intermediate results are currently not supported in
native iterations (see https://issues.apache.org/jira/browse/FLINK-1713 ).
So there are three ways to solve this:
1) We extend Pipelined subpartitions to make it possible to read them
multiple times
2) We extend Blocking subpartitions to make it possible to read them
multiple times, but then we also have to fix FLINK-1713. So we can use
broadcasts in native iterations
3) We create one pipelined subpartition for every taskmanager. Problem: The
more taskmanager there are, the more redundant data we store, but the
network traffic stays optimal.

Thank you for your help,
Felix

2016-08-01 22:51 GMT+07:00 Stephan Ewen :

> Hi Felix!
>
> Hope this helps_
>
> Concerning (1.1) - The producer does not think in term of number of target
> TaskManagers. That number can, after all, change in the presence of a
> failure and recovery. The producer should, for its own result, not care how
> many consumers it will have (Tasks), but produce it only once.
>
> Concerning (1.2)  - Only "blocking" intermediate results can be consumed
> multiple times. Data sent to broadcast variables must thus be always a
> blocking intermediate result.
>
> Greetings,
> Stephan
>
>
> On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz 
> wrote:
>
> > Hi Stephan,
> >
> > thanks for the great ideas. First I have some questions:
> >
> > 1.1) Does every task generate an intermediate result partition for every
> > target task or is that already implemented in a way so that there are
> only
> > as many intermediate result partitions per task manager as target tasks?
> > (Example: There are 2 task managers with 2 tasks each. Do we get 4
> > intermediate result partitions per task manager or do we get 8?)
> > 1.2) How can I consume an intermediate result partition multiple times?
> > When I tried that I got the following exception:
> > Caused by: java.lang.IllegalStateException: Subpartition 0 of
> > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is
> being
> > or already has been consumed, but pipelined subpartitions can only be
> > consumed once.
> > at
> >
> > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> createReadView(PipelinedSubpartition.java:179)
> > at
> >
> > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> createReadView(PipelinedSubpartition.java:36)
> > at
> >
> > org.apache.flink.runtime.io.network.partition.ResultPartition.
> createSubpartitionView(ResultPartition.java:348)
> > at
> >
> > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.
> createSubpartitionView(ResultPartitionManager.java:81)
> > at
> >
> > org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.
> channelRead0(PartitionRequestServerHandler.java:98)
> > at
> >
> > org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.
> channelRead0(PartitionRequestServerHandler.java:41)
> > at
> >
> > io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
> >
> > My status update: Since Friday I am implementing your idea described in
> > (2). Locally this approach already works (for less than 170 iterations).
> I
> > will investigate further to solve that issue.
> >
> > But I am still not sure how to implement (1). Maybe we introduce a
> similar
> > construct like the BroadcastVariableManager to share the RecordWriter
> among
> > all tasks of a taskmanager. I am interested in your thoughts :)
> >
> > Best regards,
> > Felix
> >
> > 2016-07-22 17:25 GMT+02:00 Stephan Ewen :
> >
> > > Hi Felix!
> > >
> > > Interesting suggestion. Here are some thoughts on the design.
> > >
> > > The two core changes needed to send data once to the TaskManagers are:
> > >
> > >   (1) Every sender needs to produce its stuff once (rather than for
> every
> > > target task), there should not be redundancy there.
> > >   (2) Every TaskManager should request the data once, other tasks in
> the
> > > same TaskManager pick it up from there.
> > >
> > >
> > > The current receiver-initialted pull model is actually a good
> abstraction
> > > for that, I think.
> > >
> > > Lets look at (1):
> > >
> > >   - Currently, the TaskManagers have a separate intermediate result
> > > partition for each target slot. They should rather have one
> intermediate
> > > result partition (saves also repeated serialization) that is consumed
> > > multiple times.
> > >
> > >   - Since the results that are to be broadcasted are always "blocking",
> > > they can be consumed (pulled)  multiples times.
> > >
> > > Lets look at (2):
> > >
> > >   - The current BroadcastVariableManager has the functionality to let
> the
> > > first accessor of the BC-variable materialize the result.
> > 

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-09 Thread Till Rohrmann
Hi Felix,

I'm not sure whether PipelinedSubpartition should be readable more than
once because then it would effectively mean that we materialize the
elements of the pipelined subpartition for stragglers. Therefore, I think
that we should make blocking intermediate results readable more than once.
This will also be beneficial for interactive programs where we continue
from the results of previous Flink jobs.

It might also be interesting to have a blocking mode which schedules its
consumers once the first result is there. Thus, having a mixture of
pipelined and blocking mode.

Cheers,
Till

On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz 
wrote:

> Hi Stephan,
>
> I did some research about blocking intermediate results. It turns out that
> neither PipelinedSubpartition (see line 178) nor blocking intermediate
> results (see SpillableSubpartition line: 189) can be read multiple times.
> Moreover blocking intermediate results are currently not supported in
> native iterations (see https://issues.apache.org/jira/browse/FLINK-1713 ).
> So there are three ways to solve this:
> 1) We extend Pipelined subpartitions to make it possible to read them
> multiple times
> 2) We extend Blocking subpartitions to make it possible to read them
> multiple times, but then we also have to fix FLINK-1713. So we can use
> broadcasts in native iterations
> 3) We create one pipelined subpartition for every taskmanager. Problem: The
> more taskmanager there are, the more redundant data we store, but the
> network traffic stays optimal.
>
> Thank you for your help,
> Felix
>
> 2016-08-01 22:51 GMT+07:00 Stephan Ewen :
>
> > Hi Felix!
> >
> > Hope this helps_
> >
> > Concerning (1.1) - The producer does not think in term of number of
> target
> > TaskManagers. That number can, after all, change in the presence of a
> > failure and recovery. The producer should, for its own result, not care
> how
> > many consumers it will have (Tasks), but produce it only once.
> >
> > Concerning (1.2)  - Only "blocking" intermediate results can be consumed
> > multiple times. Data sent to broadcast variables must thus be always a
> > blocking intermediate result.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz 
> > wrote:
> >
> > > Hi Stephan,
> > >
> > > thanks for the great ideas. First I have some questions:
> > >
> > > 1.1) Does every task generate an intermediate result partition for
> every
> > > target task or is that already implemented in a way so that there are
> > only
> > > as many intermediate result partitions per task manager as target
> tasks?
> > > (Example: There are 2 task managers with 2 tasks each. Do we get 4
> > > intermediate result partitions per task manager or do we get 8?)
> > > 1.2) How can I consume an intermediate result partition multiple times?
> > > When I tried that I got the following exception:
> > > Caused by: java.lang.IllegalStateException: Subpartition 0 of
> > > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is
> > being
> > > or already has been consumed, but pipelined subpartitions can only be
> > > consumed once.
> > > at
> > >
> > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> > createReadView(PipelinedSubpartition.java:179)
> > > at
> > >
> > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> > createReadView(PipelinedSubpartition.java:36)
> > > at
> > >
> > > org.apache.flink.runtime.io.network.partition.ResultPartition.
> > createSubpartitionView(ResultPartition.java:348)
> > > at
> > >
> > > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.
> > createSubpartitionView(ResultPartitionManager.java:81)
> > > at
> > >
> > > org.apache.flink.runtime.io.network.netty.
> PartitionRequestServerHandler.
> > channelRead0(PartitionRequestServerHandler.java:98)
> > > at
> > >
> > > org.apache.flink.runtime.io.network.netty.
> PartitionRequestServerHandler.
> > channelRead0(PartitionRequestServerHandler.java:41)
> > > at
> > >
> > > io.netty.channel.SimpleChannelInboundHandler.channelRead(
> > SimpleChannelInboundHandler.java:105)
> > >
> > > My status update: Since Friday I am implementing your idea described in
> > > (2). Locally this approach already works (for less than 170
> iterations).
> > I
> > > will investigate further to solve that issue.
> > >
> > > But I am still not sure how to implement (1). Maybe we introduce a
> > similar
> > > construct like the BroadcastVariableManager to share the RecordWriter
> > among
> > > all tasks of a taskmanager. I am interested in your thoughts :)
> > >
> > > Best regards,
> > > Felix
> > >
> > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen :
> > >
> > > > Hi Felix!
> > > >
> > > > Interesting suggestion. Here are some thoughts on the design.
> > > >
> > > > The two core changes needed to send data once to the TaskManagers
> are:
> > > >
> > > >   (1) Every sender needs to produce its stuff once (rather than for
> > every
> > > > target task), the

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-09 Thread Felix Neutatz
Hi Till,

thanks for the fast answer. I also think this should be the way to go. So
should I open a new jira "Make blocking SpillableSubpartition able to be
read multiple times". Moreover should I mark this jira and FLINK-1713
 as blocking for the
broadcast jira? What do you think?

Best regards,
Felix

2016-08-09 17:41 GMT+07:00 Till Rohrmann :

> Hi Felix,
>
> I'm not sure whether PipelinedSubpartition should be readable more than
> once because then it would effectively mean that we materialize the
> elements of the pipelined subpartition for stragglers. Therefore, I think
> that we should make blocking intermediate results readable more than once.
> This will also be beneficial for interactive programs where we continue
> from the results of previous Flink jobs.
>
> It might also be interesting to have a blocking mode which schedules its
> consumers once the first result is there. Thus, having a mixture of
> pipelined and blocking mode.
>
> Cheers,
> Till
>
> On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz 
> wrote:
>
> > Hi Stephan,
> >
> > I did some research about blocking intermediate results. It turns out
> that
> > neither PipelinedSubpartition (see line 178) nor blocking intermediate
> > results (see SpillableSubpartition line: 189) can be read multiple times.
> > Moreover blocking intermediate results are currently not supported in
> > native iterations (see https://issues.apache.org/jira/browse/FLINK-1713
> ).
> > So there are three ways to solve this:
> > 1) We extend Pipelined subpartitions to make it possible to read them
> > multiple times
> > 2) We extend Blocking subpartitions to make it possible to read them
> > multiple times, but then we also have to fix FLINK-1713. So we can use
> > broadcasts in native iterations
> > 3) We create one pipelined subpartition for every taskmanager. Problem:
> The
> > more taskmanager there are, the more redundant data we store, but the
> > network traffic stays optimal.
> >
> > Thank you for your help,
> > Felix
> >
> > 2016-08-01 22:51 GMT+07:00 Stephan Ewen :
> >
> > > Hi Felix!
> > >
> > > Hope this helps_
> > >
> > > Concerning (1.1) - The producer does not think in term of number of
> > target
> > > TaskManagers. That number can, after all, change in the presence of a
> > > failure and recovery. The producer should, for its own result, not care
> > how
> > > many consumers it will have (Tasks), but produce it only once.
> > >
> > > Concerning (1.2)  - Only "blocking" intermediate results can be
> consumed
> > > multiple times. Data sent to broadcast variables must thus be always a
> > > blocking intermediate result.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <
> neut...@googlemail.com>
> > > wrote:
> > >
> > > > Hi Stephan,
> > > >
> > > > thanks for the great ideas. First I have some questions:
> > > >
> > > > 1.1) Does every task generate an intermediate result partition for
> > every
> > > > target task or is that already implemented in a way so that there are
> > > only
> > > > as many intermediate result partitions per task manager as target
> > tasks?
> > > > (Example: There are 2 task managers with 2 tasks each. Do we get 4
> > > > intermediate result partitions per task manager or do we get 8?)
> > > > 1.2) How can I consume an intermediate result partition multiple
> times?
> > > > When I tried that I got the following exception:
> > > > Caused by: java.lang.IllegalStateException: Subpartition 0 of
> > > > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is
> > > being
> > > > or already has been consumed, but pipelined subpartitions can only be
> > > > consumed once.
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> > > createReadView(PipelinedSubpartition.java:179)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> > > createReadView(PipelinedSubpartition.java:36)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.ResultPartition.
> > > createSubpartitionView(ResultPartition.java:348)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.
> ResultPartitionManager.
> > > createSubpartitionView(ResultPartitionManager.java:81)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.netty.
> > PartitionRequestServerHandler.
> > > channelRead0(PartitionRequestServerHandler.java:98)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.netty.
> > PartitionRequestServerHandler.
> > > channelRead0(PartitionRequestServerHandler.java:41)
> > > > at
> > > >
> > > > io.netty.channel.SimpleChannelInboundHandler.channelRead(
> > > SimpleChannelInboundHandler.java:105)
> > > >
> > > > My status update: Since Friday I am implementing your idea described
> in
> > > > (2). Locally this approach already works (for less than 170
> > iterations).
> > > I
> > > > will investigate furthe

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-09 Thread Till Rohrmann
Hi Felix,

if we cannot work around the problem with blocking intermediate results in
iterations, then we have to make FLINK-1713 a blocker for this new issue.
But maybe you can also keep the current broadcasting mechanism to be used
within iterations only. Then we can address the iteration problem later.

Cheers,
Till

On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz 
wrote:

> Hi Till,
>
> thanks for the fast answer. I also think this should be the way to go. So
> should I open a new jira "Make blocking SpillableSubpartition able to be
> read multiple times". Moreover should I mark this jira and FLINK-1713
>  as blocking for the
> broadcast jira? What do you think?
>
> Best regards,
> Felix
>
> 2016-08-09 17:41 GMT+07:00 Till Rohrmann :
>
> > Hi Felix,
> >
> > I'm not sure whether PipelinedSubpartition should be readable more than
> > once because then it would effectively mean that we materialize the
> > elements of the pipelined subpartition for stragglers. Therefore, I think
> > that we should make blocking intermediate results readable more than
> once.
> > This will also be beneficial for interactive programs where we continue
> > from the results of previous Flink jobs.
> >
> > It might also be interesting to have a blocking mode which schedules its
> > consumers once the first result is there. Thus, having a mixture of
> > pipelined and blocking mode.
> >
> > Cheers,
> > Till
> >
> > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz 
> > wrote:
> >
> > > Hi Stephan,
> > >
> > > I did some research about blocking intermediate results. It turns out
> > that
> > > neither PipelinedSubpartition (see line 178) nor blocking intermediate
> > > results (see SpillableSubpartition line: 189) can be read multiple
> times.
> > > Moreover blocking intermediate results are currently not supported in
> > > native iterations (see https://issues.apache.org/
> jira/browse/FLINK-1713
> > ).
> > > So there are three ways to solve this:
> > > 1) We extend Pipelined subpartitions to make it possible to read them
> > > multiple times
> > > 2) We extend Blocking subpartitions to make it possible to read them
> > > multiple times, but then we also have to fix FLINK-1713. So we can use
> > > broadcasts in native iterations
> > > 3) We create one pipelined subpartition for every taskmanager. Problem:
> > The
> > > more taskmanager there are, the more redundant data we store, but the
> > > network traffic stays optimal.
> > >
> > > Thank you for your help,
> > > Felix
> > >
> > > 2016-08-01 22:51 GMT+07:00 Stephan Ewen :
> > >
> > > > Hi Felix!
> > > >
> > > > Hope this helps_
> > > >
> > > > Concerning (1.1) - The producer does not think in term of number of
> > > target
> > > > TaskManagers. That number can, after all, change in the presence of a
> > > > failure and recovery. The producer should, for its own result, not
> care
> > > how
> > > > many consumers it will have (Tasks), but produce it only once.
> > > >
> > > > Concerning (1.2)  - Only "blocking" intermediate results can be
> > consumed
> > > > multiple times. Data sent to broadcast variables must thus be always
> a
> > > > blocking intermediate result.
> > > >
> > > > Greetings,
> > > > Stephan
> > > >
> > > >
> > > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <
> > neut...@googlemail.com>
> > > > wrote:
> > > >
> > > > > Hi Stephan,
> > > > >
> > > > > thanks for the great ideas. First I have some questions:
> > > > >
> > > > > 1.1) Does every task generate an intermediate result partition for
> > > every
> > > > > target task or is that already implemented in a way so that there
> are
> > > > only
> > > > > as many intermediate result partitions per task manager as target
> > > tasks?
> > > > > (Example: There are 2 task managers with 2 tasks each. Do we get 4
> > > > > intermediate result partitions per task manager or do we get 8?)
> > > > > 1.2) How can I consume an intermediate result partition multiple
> > times?
> > > > > When I tried that I got the following exception:
> > > > > Caused by: java.lang.IllegalStateException: Subpartition 0 of
> > > > > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf
> is
> > > > being
> > > > > or already has been consumed, but pipelined subpartitions can only
> be
> > > > > consumed once.
> > > > > at
> > > > >
> > > > > org.apache.flink.runtime.io.network.partition.
> PipelinedSubpartition.
> > > > createReadView(PipelinedSubpartition.java:179)
> > > > > at
> > > > >
> > > > > org.apache.flink.runtime.io.network.partition.
> PipelinedSubpartition.
> > > > createReadView(PipelinedSubpartition.java:36)
> > > > > at
> > > > >
> > > > > org.apache.flink.runtime.io.network.partition.ResultPartition.
> > > > createSubpartitionView(ResultPartition.java:348)
> > > > > at
> > > > >
> > > > > org.apache.flink.runtime.io.network.partition.
> > ResultPartitionManager.
> > > > createSubpartitionView(ResultPartitionManager.java:81)
> > > > > at
> > > > >
> > > > > org.

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-09 Thread Stephan Ewen
I agree with Till. Changing the basic data exchange mechanism would screw
up many other ongoing efforts, like more incremental recovery.

It seems to make this properly applicable, we need to first un-specialize
the iterations.

(1) Allow for "versioned" intermediate results, i.e., result-x-superstep1,
result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
We need something similar for fined grained recovery in streaming
(result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
result-x-checkpoint4, ...) so it may be worth addressing that soon anyways.

(2) Make iterations not dependent on the special local back channel.
Then we can simply schedule iterations like all other things.

(3) Do the actual FLIP-5 proposal


That's quite an effort, but I fear all else will break the engine and other
efforts.

Best,
Stephan





On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann  wrote:

> Hi Felix,
>
> if we cannot work around the problem with blocking intermediate results in
> iterations, then we have to make FLINK-1713 a blocker for this new issue.
> But maybe you can also keep the current broadcasting mechanism to be used
> within iterations only. Then we can address the iteration problem later.
>
> Cheers,
> Till
>
> On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz 
> wrote:
>
> > Hi Till,
> >
> > thanks for the fast answer. I also think this should be the way to go. So
> > should I open a new jira "Make blocking SpillableSubpartition able to be
> > read multiple times". Moreover should I mark this jira and FLINK-1713
> >  as blocking for the
> > broadcast jira? What do you think?
> >
> > Best regards,
> > Felix
> >
> > 2016-08-09 17:41 GMT+07:00 Till Rohrmann :
> >
> > > Hi Felix,
> > >
> > > I'm not sure whether PipelinedSubpartition should be readable more than
> > > once because then it would effectively mean that we materialize the
> > > elements of the pipelined subpartition for stragglers. Therefore, I
> think
> > > that we should make blocking intermediate results readable more than
> > once.
> > > This will also be beneficial for interactive programs where we continue
> > > from the results of previous Flink jobs.
> > >
> > > It might also be interesting to have a blocking mode which schedules
> its
> > > consumers once the first result is there. Thus, having a mixture of
> > > pipelined and blocking mode.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz 
> > > wrote:
> > >
> > > > Hi Stephan,
> > > >
> > > > I did some research about blocking intermediate results. It turns out
> > > that
> > > > neither PipelinedSubpartition (see line 178) nor blocking
> intermediate
> > > > results (see SpillableSubpartition line: 189) can be read multiple
> > times.
> > > > Moreover blocking intermediate results are currently not supported in
> > > > native iterations (see https://issues.apache.org/
> > jira/browse/FLINK-1713
> > > ).
> > > > So there are three ways to solve this:
> > > > 1) We extend Pipelined subpartitions to make it possible to read them
> > > > multiple times
> > > > 2) We extend Blocking subpartitions to make it possible to read them
> > > > multiple times, but then we also have to fix FLINK-1713. So we can
> use
> > > > broadcasts in native iterations
> > > > 3) We create one pipelined subpartition for every taskmanager.
> Problem:
> > > The
> > > > more taskmanager there are, the more redundant data we store, but the
> > > > network traffic stays optimal.
> > > >
> > > > Thank you for your help,
> > > > Felix
> > > >
> > > > 2016-08-01 22:51 GMT+07:00 Stephan Ewen :
> > > >
> > > > > Hi Felix!
> > > > >
> > > > > Hope this helps_
> > > > >
> > > > > Concerning (1.1) - The producer does not think in term of number of
> > > > target
> > > > > TaskManagers. That number can, after all, change in the presence
> of a
> > > > > failure and recovery. The producer should, for its own result, not
> > care
> > > > how
> > > > > many consumers it will have (Tasks), but produce it only once.
> > > > >
> > > > > Concerning (1.2)  - Only "blocking" intermediate results can be
> > > consumed
> > > > > multiple times. Data sent to broadcast variables must thus be
> always
> > a
> > > > > blocking intermediate result.
> > > > >
> > > > > Greetings,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <
> > > neut...@googlemail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Stephan,
> > > > > >
> > > > > > thanks for the great ideas. First I have some questions:
> > > > > >
> > > > > > 1.1) Does every task generate an intermediate result partition
> for
> > > > every
> > > > > > target task or is that already implemented in a way so that there
> > are
> > > > > only
> > > > > > as many intermediate result partitions per task manager as target
> > > > tasks?
> > > > > > (Example: There are 2 task managers with 2 tasks each. Do we get
> 4
> > > > > > intermediate resu

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-10 Thread Felix Neutatz
Hi everybody,

I found a quick and dirty way to make the blocking subpartition readable by
multiple readers. In the JobGraph generation I make all broadcast
partitions blocking (see more details here:
https://github.com/FelixNeutatz/incubator-flink/commits/blockingMultipleReads).
I want to point out that this branch is only experimental!

This works for the simple Map().withBroadcastSet() use case.

To test this approach, I run our peel bundle flink-broadcast (
https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power
cluster. Ibm-power has 8 nodes and we scale the number of slots per node
from 1 - 16:

broadcast.ibm-power-1 broadcast.01 6597.33
broadcast.ibm-power-1 broadcast.02 5997
broadcast.ibm-power-1 broadcast.04 6576.67
broadcast.ibm-power-1 broadcast.08 7024.33
broadcast.ibm-power-1 broadcast.16 6933.33

The last row is the averaged run time in milliseconds over 3 runs. You can
clearly see, that the run time stays constant :)

As discussed, this approach doesn't work yet for native iterations (see
FLINK-1713).

So in the next weeks I will work on the native iterations as Stephan
proposed.

Best regards,
Felix



2016-08-09 21:29 GMT+07:00 Stephan Ewen :

> I agree with Till. Changing the basic data exchange mechanism would screw
> up many other ongoing efforts, like more incremental recovery.
>
> It seems to make this properly applicable, we need to first un-specialize
> the iterations.
>
> (1) Allow for "versioned" intermediate results, i.e., result-x-superstep1,
> result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
> We need something similar for fined grained recovery in streaming
> (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
> result-x-checkpoint4, ...) so it may be worth addressing that soon anyways.
>
> (2) Make iterations not dependent on the special local back channel.
> Then we can simply schedule iterations like all other things.
>
> (3) Do the actual FLIP-5 proposal
>
>
> That's quite an effort, but I fear all else will break the engine and other
> efforts.
>
> Best,
> Stephan
>
>
>
>
>
> On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann 
> wrote:
>
> > Hi Felix,
> >
> > if we cannot work around the problem with blocking intermediate results
> in
> > iterations, then we have to make FLINK-1713 a blocker for this new issue.
> > But maybe you can also keep the current broadcasting mechanism to be used
> > within iterations only. Then we can address the iteration problem later.
> >
> > Cheers,
> > Till
> >
> > On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz 
> > wrote:
> >
> > > Hi Till,
> > >
> > > thanks for the fast answer. I also think this should be the way to go.
> So
> > > should I open a new jira "Make blocking SpillableSubpartition able to
> be
> > > read multiple times". Moreover should I mark this jira and FLINK-1713
> > >  as blocking for the
> > > broadcast jira? What do you think?
> > >
> > > Best regards,
> > > Felix
> > >
> > > 2016-08-09 17:41 GMT+07:00 Till Rohrmann :
> > >
> > > > Hi Felix,
> > > >
> > > > I'm not sure whether PipelinedSubpartition should be readable more
> than
> > > > once because then it would effectively mean that we materialize the
> > > > elements of the pipelined subpartition for stragglers. Therefore, I
> > think
> > > > that we should make blocking intermediate results readable more than
> > > once.
> > > > This will also be beneficial for interactive programs where we
> continue
> > > > from the results of previous Flink jobs.
> > > >
> > > > It might also be interesting to have a blocking mode which schedules
> > its
> > > > consumers once the first result is there. Thus, having a mixture of
> > > > pipelined and blocking mode.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <
> neut...@googlemail.com>
> > > > wrote:
> > > >
> > > > > Hi Stephan,
> > > > >
> > > > > I did some research about blocking intermediate results. It turns
> out
> > > > that
> > > > > neither PipelinedSubpartition (see line 178) nor blocking
> > intermediate
> > > > > results (see SpillableSubpartition line: 189) can be read multiple
> > > times.
> > > > > Moreover blocking intermediate results are currently not supported
> in
> > > > > native iterations (see https://issues.apache.org/
> > > jira/browse/FLINK-1713
> > > > ).
> > > > > So there are three ways to solve this:
> > > > > 1) We extend Pipelined subpartitions to make it possible to read
> them
> > > > > multiple times
> > > > > 2) We extend Blocking subpartitions to make it possible to read
> them
> > > > > multiple times, but then we also have to fix FLINK-1713. So we can
> > use
> > > > > broadcasts in native iterations
> > > > > 3) We create one pipelined subpartition for every taskmanager.
> > Problem:
> > > > The
> > > > > more taskmanager there are, the more redundant data we store, but
> the
> > > > > network traffic stays optimal.

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-10 Thread Stephan Ewen
Cool, nice results!

For the iteration unspecialization - we probably should design this hand in
hand with the streaming fault tolerance, as they share the notion of
"intermediate result versions".


On Wed, Aug 10, 2016 at 6:09 PM, Felix Neutatz 
wrote:

> Hi everybody,
>
> I found a quick and dirty way to make the blocking subpartition readable by
> multiple readers. In the JobGraph generation I make all broadcast
> partitions blocking (see more details here:
> https://github.com/FelixNeutatz/incubator-flink/
> commits/blockingMultipleReads).
> I want to point out that this branch is only experimental!
>
> This works for the simple Map().withBroadcastSet() use case.
>
> To test this approach, I run our peel bundle flink-broadcast (
> https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power
> cluster. Ibm-power has 8 nodes and we scale the number of slots per node
> from 1 - 16:
>
> broadcast.ibm-power-1 broadcast.01 6597.33
> broadcast.ibm-power-1 broadcast.02 5997
> broadcast.ibm-power-1 broadcast.04 6576.67
> broadcast.ibm-power-1 broadcast.08 7024.33
> broadcast.ibm-power-1 broadcast.16 6933.33
>
> The last row is the averaged run time in milliseconds over 3 runs. You can
> clearly see, that the run time stays constant :)
>
> As discussed, this approach doesn't work yet for native iterations (see
> FLINK-1713).
>
> So in the next weeks I will work on the native iterations as Stephan
> proposed.
>
> Best regards,
> Felix
>
>
>
> 2016-08-09 21:29 GMT+07:00 Stephan Ewen :
>
> > I agree with Till. Changing the basic data exchange mechanism would screw
> > up many other ongoing efforts, like more incremental recovery.
> >
> > It seems to make this properly applicable, we need to first un-specialize
> > the iterations.
> >
> > (1) Allow for "versioned" intermediate results, i.e.,
> result-x-superstep1,
> > result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
> > We need something similar for fined grained recovery in streaming
> > (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
> > result-x-checkpoint4, ...) so it may be worth addressing that soon
> anyways.
> >
> > (2) Make iterations not dependent on the special local back channel.
> > Then we can simply schedule iterations like all other things.
> >
> > (3) Do the actual FLIP-5 proposal
> >
> >
> > That's quite an effort, but I fear all else will break the engine and
> other
> > efforts.
> >
> > Best,
> > Stephan
> >
> >
> >
> >
> >
> > On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann 
> > wrote:
> >
> > > Hi Felix,
> > >
> > > if we cannot work around the problem with blocking intermediate results
> > in
> > > iterations, then we have to make FLINK-1713 a blocker for this new
> issue.
> > > But maybe you can also keep the current broadcasting mechanism to be
> used
> > > within iterations only. Then we can address the iteration problem
> later.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz 
> > > wrote:
> > >
> > > > Hi Till,
> > > >
> > > > thanks for the fast answer. I also think this should be the way to
> go.
> > So
> > > > should I open a new jira "Make blocking SpillableSubpartition able to
> > be
> > > > read multiple times". Moreover should I mark this jira and FLINK-1713
> > > >  as blocking for
> the
> > > > broadcast jira? What do you think?
> > > >
> > > > Best regards,
> > > > Felix
> > > >
> > > > 2016-08-09 17:41 GMT+07:00 Till Rohrmann :
> > > >
> > > > > Hi Felix,
> > > > >
> > > > > I'm not sure whether PipelinedSubpartition should be readable more
> > than
> > > > > once because then it would effectively mean that we materialize the
> > > > > elements of the pipelined subpartition for stragglers. Therefore, I
> > > think
> > > > > that we should make blocking intermediate results readable more
> than
> > > > once.
> > > > > This will also be beneficial for interactive programs where we
> > continue
> > > > > from the results of previous Flink jobs.
> > > > >
> > > > > It might also be interesting to have a blocking mode which
> schedules
> > > its
> > > > > consumers once the first result is there. Thus, having a mixture of
> > > > > pipelined and blocking mode.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <
> > neut...@googlemail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Stephan,
> > > > > >
> > > > > > I did some research about blocking intermediate results. It turns
> > out
> > > > > that
> > > > > > neither PipelinedSubpartition (see line 178) nor blocking
> > > intermediate
> > > > > > results (see SpillableSubpartition line: 189) can be read
> multiple
> > > > times.
> > > > > > Moreover blocking intermediate results are currently not
> supported
> > in
> > > > > > native iterations (see https://issues.apache.org/
> > > > jira/browse/FLINK-1713
> > > > > ).
> > > > > > So there are three ways to so

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-10 Thread Till Rohrmann
Cool first version Felix :-)

On Wed, Aug 10, 2016 at 6:48 PM, Stephan Ewen  wrote:

> Cool, nice results!
>
> For the iteration unspecialization - we probably should design this hand in
> hand with the streaming fault tolerance, as they share the notion of
> "intermediate result versions".
>
>
> On Wed, Aug 10, 2016 at 6:09 PM, Felix Neutatz 
> wrote:
>
> > Hi everybody,
> >
> > I found a quick and dirty way to make the blocking subpartition readable
> by
> > multiple readers. In the JobGraph generation I make all broadcast
> > partitions blocking (see more details here:
> > https://github.com/FelixNeutatz/incubator-flink/
> > commits/blockingMultipleReads).
> > I want to point out that this branch is only experimental!
> >
> > This works for the simple Map().withBroadcastSet() use case.
> >
> > To test this approach, I run our peel bundle flink-broadcast (
> > https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power
> > cluster. Ibm-power has 8 nodes and we scale the number of slots per node
> > from 1 - 16:
> >
> > broadcast.ibm-power-1 broadcast.01 6597.33
> > broadcast.ibm-power-1 broadcast.02 5997
> > broadcast.ibm-power-1 broadcast.04 6576.67
> > broadcast.ibm-power-1 broadcast.08 7024.33
> > broadcast.ibm-power-1 broadcast.16 6933.33
> >
> > The last row is the averaged run time in milliseconds over 3 runs. You
> can
> > clearly see, that the run time stays constant :)
> >
> > As discussed, this approach doesn't work yet for native iterations (see
> > FLINK-1713).
> >
> > So in the next weeks I will work on the native iterations as Stephan
> > proposed.
> >
> > Best regards,
> > Felix
> >
> >
> >
> > 2016-08-09 21:29 GMT+07:00 Stephan Ewen :
> >
> > > I agree with Till. Changing the basic data exchange mechanism would
> screw
> > > up many other ongoing efforts, like more incremental recovery.
> > >
> > > It seems to make this properly applicable, we need to first
> un-specialize
> > > the iterations.
> > >
> > > (1) Allow for "versioned" intermediate results, i.e.,
> > result-x-superstep1,
> > > result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
> > > We need something similar for fined grained recovery in streaming
> > > (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
> > > result-x-checkpoint4, ...) so it may be worth addressing that soon
> > anyways.
> > >
> > > (2) Make iterations not dependent on the special local back channel.
> > > Then we can simply schedule iterations like all other things.
> > >
> > > (3) Do the actual FLIP-5 proposal
> > >
> > >
> > > That's quite an effort, but I fear all else will break the engine and
> > other
> > > efforts.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann 
> > > wrote:
> > >
> > > > Hi Felix,
> > > >
> > > > if we cannot work around the problem with blocking intermediate
> results
> > > in
> > > > iterations, then we have to make FLINK-1713 a blocker for this new
> > issue.
> > > > But maybe you can also keep the current broadcasting mechanism to be
> > used
> > > > within iterations only. Then we can address the iteration problem
> > later.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz <
> neut...@googlemail.com>
> > > > wrote:
> > > >
> > > > > Hi Till,
> > > > >
> > > > > thanks for the fast answer. I also think this should be the way to
> > go.
> > > So
> > > > > should I open a new jira "Make blocking SpillableSubpartition able
> to
> > > be
> > > > > read multiple times". Moreover should I mark this jira and
> FLINK-1713
> > > > >  as blocking for
> > the
> > > > > broadcast jira? What do you think?
> > > > >
> > > > > Best regards,
> > > > > Felix
> > > > >
> > > > > 2016-08-09 17:41 GMT+07:00 Till Rohrmann :
> > > > >
> > > > > > Hi Felix,
> > > > > >
> > > > > > I'm not sure whether PipelinedSubpartition should be readable
> more
> > > than
> > > > > > once because then it would effectively mean that we materialize
> the
> > > > > > elements of the pipelined subpartition for stragglers.
> Therefore, I
> > > > think
> > > > > > that we should make blocking intermediate results readable more
> > than
> > > > > once.
> > > > > > This will also be beneficial for interactive programs where we
> > > continue
> > > > > > from the results of previous Flink jobs.
> > > > > >
> > > > > > It might also be interesting to have a blocking mode which
> > schedules
> > > > its
> > > > > > consumers once the first result is there. Thus, having a mixture
> of
> > > > > > pipelined and blocking mode.
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <
> > > neut...@googlemail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Stephan,
> > > > > > >
> > > > > > > I did some research about blocking intermediate results. It
> turns
> > > out
> > > > >

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-11-10 Thread Felix Neutatz
Hi everybody,

the previous approach turned out to have an issue. Since we only write to
one subpartition, we have N-1 empty subpartitions per Task (where N =
degree of parallelism). In the current approach I didn't consume these
empty subpartitions. When you don't consume a subpartition it won't be
released. So we have a memory leak.

One workaround would be to read the empty subpartitions. But this is a
really ugly work-around.

So I had a chat with Till and we decided to create only one subpartition
instead of N subpartitions per task. I have already implemented this
approach.

Now the problem is that we need to know, when to release this subpartition.
We will create M subpartition-views per subpartition (where M is the number
of task managers & M <= N).

There are many ways to solve this problem:
1. Tell the subpartition how many taskmanagers will consume it.
(=> propagate M)
2. All tasks which don't need to read the subpartition, send a message to
the subpartition. So the subpartition will receive M release requests and
N-M "I am done" requests. So when the subpartition knows the number of
parallelism N, we are fine. (=> propagate N)

Any thoughts how to tackle this problem?

Best regards,
Felix

2016-08-10 19:14 GMT+02:00 Till Rohrmann :

> Cool first version Felix :-)
>
> On Wed, Aug 10, 2016 at 6:48 PM, Stephan Ewen  wrote:
>
> > Cool, nice results!
> >
> > For the iteration unspecialization - we probably should design this hand
> in
> > hand with the streaming fault tolerance, as they share the notion of
> > "intermediate result versions".
> >
> >
> > On Wed, Aug 10, 2016 at 6:09 PM, Felix Neutatz 
> > wrote:
> >
> > > Hi everybody,
> > >
> > > I found a quick and dirty way to make the blocking subpartition
> readable
> > by
> > > multiple readers. In the JobGraph generation I make all broadcast
> > > partitions blocking (see more details here:
> > > https://github.com/FelixNeutatz/incubator-flink/
> > > commits/blockingMultipleReads).
> > > I want to point out that this branch is only experimental!
> > >
> > > This works for the simple Map().withBroadcastSet() use case.
> > >
> > > To test this approach, I run our peel bundle flink-broadcast (
> > > https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power
> > > cluster. Ibm-power has 8 nodes and we scale the number of slots per
> node
> > > from 1 - 16:
> > >
> > > broadcast.ibm-power-1 broadcast.01 6597.33
> > > broadcast.ibm-power-1 broadcast.02 5997
> > > broadcast.ibm-power-1 broadcast.04 6576.67
> > > broadcast.ibm-power-1 broadcast.08 7024.33
> > > broadcast.ibm-power-1 broadcast.16 6933.33
> > >
> > > The last row is the averaged run time in milliseconds over 3 runs. You
> > can
> > > clearly see, that the run time stays constant :)
> > >
> > > As discussed, this approach doesn't work yet for native iterations (see
> > > FLINK-1713).
> > >
> > > So in the next weeks I will work on the native iterations as Stephan
> > > proposed.
> > >
> > > Best regards,
> > > Felix
> > >
> > >
> > >
> > > 2016-08-09 21:29 GMT+07:00 Stephan Ewen :
> > >
> > > > I agree with Till. Changing the basic data exchange mechanism would
> > screw
> > > > up many other ongoing efforts, like more incremental recovery.
> > > >
> > > > It seems to make this properly applicable, we need to first
> > un-specialize
> > > > the iterations.
> > > >
> > > > (1) Allow for "versioned" intermediate results, i.e.,
> > > result-x-superstep1,
> > > > result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
> > > > We need something similar for fined grained recovery in streaming
> > > > (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
> > > > result-x-checkpoint4, ...) so it may be worth addressing that soon
> > > anyways.
> > > >
> > > > (2) Make iterations not dependent on the special local back channel.
> > > > Then we can simply schedule iterations like all other things.
> > > >
> > > > (3) Do the actual FLIP-5 proposal
> > > >
> > > >
> > > > That's quite an effort, but I fear all else will break the engine and
> > > other
> > > > efforts.
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann 
> > > > wrote:
> > > >
> > > > > Hi Felix,
> > > > >
> > > > > if we cannot work around the problem with blocking intermediate
> > results
> > > > in
> > > > > iterations, then we have to make FLINK-1713 a blocker for this new
> > > issue.
> > > > > But maybe you can also keep the current broadcasting mechanism to
> be
> > > used
> > > > > within iterations only. Then we can address the iteration problem
> > > later.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz <
> > neut...@googlemail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Till,
> > > > > >
> > > > > > thanks for the fast answer. I also think this should be the way
> to
> > > go.
> > > > So
> > > > > > should I open a 

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-12-02 Thread Felix Neutatz
Hi everybody,

I implemented the second approach (see https://cwiki.apache.org/confl
uence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskman
ager+once+for+broadcasts). So each subpartition will be read by m tasks (m
= number of task managers) and the other tasks will notify the subpartition
that they don't need to read. This solves the problem, and we release the
subpartition just when we don't need it anymore.

The message I sent for all task which don't need to read is "
notifySubpartitionConsumed()"
https://github.com/FelixNeutatz/incubator-flink/blob/1b58d9c9df89620f2557b59e7fde40ffe04f49d8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L460

This means I first have to connect via a PartitionRequest and then i will
notify all channels.
One problem of using the standard PartitionRequest is that we will already
fill the first buffer:
https://github.com/FelixNeutatz/incubator-flink/blob/oneSubpartition/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java#L123

So my question is: is that ok, or:
1) should we introduce another Netty.Message
PartitionRequestAndNotifyConsumed
2) should we extend the PartitionRequest with the attribute "boolean
getAhead"

Current problems:
Native iterations:
Native iterations work but are not optimized. Theoretically, in the case of
native iterations we can also notify the subpartitions instead of reading
them, but at the moment I get the following exception when I do so:
java.lang.IllegalStateException: Queried for a buffer before requesting the
subpartition.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.io.network.partition.consumer.Local
InputChannel.getNextBuffer(LocalInputChannel.java:152)
at org.apache.flink.runtime.io.network.partition.consumer.Singl
eInputGate.getNextBufferOrEvent(SingleInputGate.java:424)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
dReader.getNextRecord(AbstractRecordReader.java:87)
at org.apache.flink.runtime.io.network.api.reader.MutableRecord
Reader.next(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(
ReaderIterator.java:73)
at org.apache.flink.runtime.broadcast.BroadcastVariableMaterial
ization.materializeVariable(BroadcastVariableMaterialization.java:114)

In general, it seems to me that this reduces network traffic even for
pipelined partitions in the "old" architecture. I will further investigate
why this is not working. For a simple map job with pipelined partitions
this already works. So there has to be some kind of iteration specific
thing which keeps that from working.

But for now I would propose to first push the improvements to Flink without
the iteration improvements. The overhead of the two code paths are just 2
lines of codes, which is really little.

I am happy to hear your thoughts :)

Best regards,
Felix

2016-11-10 12:24 GMT+01:00 Felix Neutatz :

> Hi everybody,
>
> the previous approach turned out to have an issue. Since we only write to
> one subpartition, we have N-1 empty subpartitions per Task (where N =
> degree of parallelism). In the current approach I didn't consume these
> empty subpartitions. When you don't consume a subpartition it won't be
> released. So we have a memory leak.
>
> One workaround would be to read the empty subpartitions. But this is a
> really ugly work-around.
>
> So I had a chat with Till and we decided to create only one subpartition
> instead of N subpartitions per task. I have already implemented this
> approach.
>
> Now the problem is that we need to know, when to release this
> subpartition. We will create M subpartition-views per subpartition (where M
> is the number of task managers & M <= N).
>
> There are many ways to solve this problem:
> 1. Tell the subpartition how many taskmanagers will consume it.
> (=> propagate M)
> 2. All tasks which don't need to read the subpartition, send a message to
> the subpartition. So the subpartition will receive M release requests and
> N-M "I am done" requests. So when the subpartition knows the number of
> parallelism N, we are fine. (=> propagate N)
>
> Any thoughts how to tackle this problem?
>
> Best regards,
> Felix
>
> 2016-08-10 19:14 GMT+02:00 Till Rohrmann :
>
>> Cool first version Felix :-)
>>
>> On Wed, Aug 10, 2016 at 6:48 PM, Stephan Ewen  wrote:
>>
>> > Cool, nice results!
>> >
>> > For the iteration unspecialization - we probably should design this
>> hand in
>> > hand with the streaming fault tolerance, as they share the notion of
>> > "intermediate result versions".
>> >
>> >
>> > On Wed, Aug 10, 2016 at 6:09 PM, Felix Neutatz 
>> > wrote:
>> >
>> > > Hi everybody,
>> > >
>> > > I found a quick and dirty way to make the blocking subpartition
>> readable
>> > by
>> > > multiple readers. In the JobGraph generation I make all broadcast
>> > > partitions blocking (see more details here:
>> > > https://github.c