Hi Till, thanks for the fast answer. I also think this should be the way to go. So should I open a new jira "Make blocking SpillableSubpartition able to be read multiple times". Moreover should I mark this jira and FLINK-1713 <https://issues.apache.org/jira/browse/FLINK-1713> as blocking for the broadcast jira? What do you think?
Best regards, Felix 2016-08-09 17:41 GMT+07:00 Till Rohrmann <trohrm...@apache.org>: > Hi Felix, > > I'm not sure whether PipelinedSubpartition should be readable more than > once because then it would effectively mean that we materialize the > elements of the pipelined subpartition for stragglers. Therefore, I think > that we should make blocking intermediate results readable more than once. > This will also be beneficial for interactive programs where we continue > from the results of previous Flink jobs. > > It might also be interesting to have a blocking mode which schedules its > consumers once the first result is there. Thus, having a mixture of > pipelined and blocking mode. > > Cheers, > Till > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <neut...@googlemail.com> > wrote: > > > Hi Stephan, > > > > I did some research about blocking intermediate results. It turns out > that > > neither PipelinedSubpartition (see line 178) nor blocking intermediate > > results (see SpillableSubpartition line: 189) can be read multiple times. > > Moreover blocking intermediate results are currently not supported in > > native iterations (see https://issues.apache.org/jira/browse/FLINK-1713 > ). > > So there are three ways to solve this: > > 1) We extend Pipelined subpartitions to make it possible to read them > > multiple times > > 2) We extend Blocking subpartitions to make it possible to read them > > multiple times, but then we also have to fix FLINK-1713. So we can use > > broadcasts in native iterations > > 3) We create one pipelined subpartition for every taskmanager. Problem: > The > > more taskmanager there are, the more redundant data we store, but the > > network traffic stays optimal. > > > > Thank you for your help, > > Felix > > > > 2016-08-01 22:51 GMT+07:00 Stephan Ewen <se...@apache.org>: > > > > > Hi Felix! > > > > > > Hope this helps_ > > > > > > Concerning (1.1) - The producer does not think in term of number of > > target > > > TaskManagers. That number can, after all, change in the presence of a > > > failure and recovery. The producer should, for its own result, not care > > how > > > many consumers it will have (Tasks), but produce it only once. > > > > > > Concerning (1.2) - Only "blocking" intermediate results can be > consumed > > > multiple times. Data sent to broadcast variables must thus be always a > > > blocking intermediate result. > > > > > > Greetings, > > > Stephan > > > > > > > > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz < > neut...@googlemail.com> > > > wrote: > > > > > > > Hi Stephan, > > > > > > > > thanks for the great ideas. First I have some questions: > > > > > > > > 1.1) Does every task generate an intermediate result partition for > > every > > > > target task or is that already implemented in a way so that there are > > > only > > > > as many intermediate result partitions per task manager as target > > tasks? > > > > (Example: There are 2 task managers with 2 tasks each. Do we get 4 > > > > intermediate result partitions per task manager or do we get 8?) > > > > 1.2) How can I consume an intermediate result partition multiple > times? > > > > When I tried that I got the following exception: > > > > Caused by: java.lang.IllegalStateException: Subpartition 0 of > > > > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is > > > being > > > > or already has been consumed, but pipelined subpartitions can only be > > > > consumed once. > > > > at > > > > > > > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition. > > > createReadView(PipelinedSubpartition.java:179) > > > > at > > > > > > > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition. > > > createReadView(PipelinedSubpartition.java:36) > > > > at > > > > > > > > org.apache.flink.runtime.io.network.partition.ResultPartition. > > > createSubpartitionView(ResultPartition.java:348) > > > > at > > > > > > > > org.apache.flink.runtime.io.network.partition. > ResultPartitionManager. > > > createSubpartitionView(ResultPartitionManager.java:81) > > > > at > > > > > > > > org.apache.flink.runtime.io.network.netty. > > PartitionRequestServerHandler. > > > channelRead0(PartitionRequestServerHandler.java:98) > > > > at > > > > > > > > org.apache.flink.runtime.io.network.netty. > > PartitionRequestServerHandler. > > > channelRead0(PartitionRequestServerHandler.java:41) > > > > at > > > > > > > > io.netty.channel.SimpleChannelInboundHandler.channelRead( > > > SimpleChannelInboundHandler.java:105) > > > > > > > > My status update: Since Friday I am implementing your idea described > in > > > > (2). Locally this approach already works (for less than 170 > > iterations). > > > I > > > > will investigate further to solve that issue. > > > > > > > > But I am still not sure how to implement (1). Maybe we introduce a > > > similar > > > > construct like the BroadcastVariableManager to share the RecordWriter > > > among > > > > all tasks of a taskmanager. I am interested in your thoughts :) > > > > > > > > Best regards, > > > > Felix > > > > > > > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen <se...@apache.org>: > > > > > > > > > Hi Felix! > > > > > > > > > > Interesting suggestion. Here are some thoughts on the design. > > > > > > > > > > The two core changes needed to send data once to the TaskManagers > > are: > > > > > > > > > > (1) Every sender needs to produce its stuff once (rather than for > > > every > > > > > target task), there should not be redundancy there. > > > > > (2) Every TaskManager should request the data once, other tasks > in > > > the > > > > > same TaskManager pick it up from there. > > > > > > > > > > > > > > > The current receiver-initialted pull model is actually a good > > > abstraction > > > > > for that, I think. > > > > > > > > > > Lets look at (1): > > > > > > > > > > - Currently, the TaskManagers have a separate intermediate result > > > > > partition for each target slot. They should rather have one > > > intermediate > > > > > result partition (saves also repeated serialization) that is > consumed > > > > > multiple times. > > > > > > > > > > - Since the results that are to be broadcasted are always > > "blocking", > > > > > they can be consumed (pulled) multiples times. > > > > > > > > > > Lets look at (2): > > > > > > > > > > - The current BroadcastVariableManager has the functionality to > let > > > the > > > > > first accessor of the BC-variable materialize the result. > > > > > > > > > > - It could be changed such that only the first accessor creates a > > > > > RecordReader, so the others do not even request the stream. That > way, > > > the > > > > > TaskManager should pull only one stream from each producing task, > > which > > > > > means the data is transferred once. > > > > > > > > > > > > > > > That would also work perfectly with the current failure / recovery > > > model. > > > > > > > > > > What do you think? > > > > > > > > > > Stephan > > > > > > > > > > > > > > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz < > > neut...@googlemail.com > > > > > > > > > wrote: > > > > > > > > > > > Hi everybody, > > > > > > > > > > > > I want to improve the performance of broadcasts in Flink. > Therefore > > > > Till > > > > > > told me to start a FLIP on this topic to discuss how to go > forward > > to > > > > > solve > > > > > > the current issues for broadcasts. > > > > > > > > > > > > The problem in a nutshell: Instead of sending data to each > > > taskmanager > > > > > only > > > > > > once, at the moment the data is sent to each task. This means if > > > there > > > > > are > > > > > > 3 slots on each taskmanager we will send the data 3 times instead > > of > > > > > once. > > > > > > > > > > > > There are multiple ways to tackle this problem and I started to > do > > > some > > > > > > research and investigate. You can follow my thought process here: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > 5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts > > > > > > > > > > > > This is my first FLIP. So please correct me, if I did something > > > wrong. > > > > > > > > > > > > I am interested in your thoughts about how to solve this issue. > Do > > > you > > > > > > think my approach is heading into the right direction or should > we > > > > > follow a > > > > > > totally different one. > > > > > > > > > > > > I am happy about any comment :) > > > > > > > > > > > > Best regards, > > > > > > Felix > > > > > > > > > > > > > > > > > > > > >