[ 
https://issues.apache.org/jira/browse/FLINK-3052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015638#comment-15015638
 ] 

ASF GitHub Bot commented on FLINK-3052:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1388

    [FLINK-3052] [optimizer] Fix instantiation of bulk iteration candidates

    When a candidate for a bulk iteration is instantiated, then the optimizer 
creates candidates
    for the step function. It is then checked that there exists a candidate 
solution for the step
    function whose properties met the properties of the input to the bulk 
iteration. Sometimes
    it is necessary to add a no-op plan node to the end of the step function to 
generate the
    correct properties. These new candidates have to be added to the final set 
of the accepted
    candidates.
    
    This commit adds that these new candidates are properly added to the set of 
accepted candidates.
    
    Fix test and add new iteration tests
    
    Add predecessor operator and dynamic path information to no op operator in 
bulk iterations

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixBulkIteration

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1388.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1388
    
----
commit 27e2085da7423bbb114ece8b9a0f19339c41a696
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2015-11-19T13:24:15Z

    [FLINK-3052] [optimizer] Fix instantiation of bulk iteration candidates
    
    When a candidate for a bulk iteration is instantiated, then the optimizer 
creates candidates
    for the step function. It is then checked that there exists a candidate 
solution for the step
    function whose properties met the properties of the input to the bulk 
iteration. Sometimes
    it is necessary to add a no-op plan node to the end of the step function to 
generate the
    correct properties. These new candidates have to be added to the final set 
of the accepted
    candidates.
    
    This commit adds that these new candidates are properly added to the set of 
accepted candidates.
    
    Fix test and add new iteration tests
    
    Add predecessor operator and dynamic path information to no op operator in 
bulk iterations

----


> Optimizer does not push properties out of bulk iterations
> ---------------------------------------------------------
>
>                 Key: FLINK-3052
>                 URL: https://issues.apache.org/jira/browse/FLINK-3052
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.10.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 0.10.1
>
>
> Flink's optimizer should be able to reuse interesting properties from outside 
> the loop. In order to do that it is sometimes necessary to append a NoOp node 
> to the step function which recomputes the required properties.
> This is currently not working for {{BulkIterations}}, because the plans with 
> the appended NoOp nodes are not added to the overall list of candidates.
> This not only leads to sub-optimal plan selection but sometimes to the 
> rejection of valid jobs. The following job, for example, will be falsely 
> rejected by flink.
> {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>               DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 
> 10).map(new MapFunction<Long, Tuple1<Long>>() {
>                       @Override
>                       public Tuple1<Long> map(Long value) throws Exception {
>                               return new Tuple1<>(value);
>                       }
>               });
>               DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 
> 10).map(new MapFunction<Long, Tuple1<Long>>() {
>                       @Override
>                       public Tuple1<Long> map(Long value) throws Exception {
>                               return new Tuple1<>(value);
>                       }
>               });
>               DataSet<Tuple1<Long>> distinctInput = input1.distinct();
>               IterativeDataSet<Tuple1<Long>> iteration = 
> distinctInput.iterate(10);
>               DataSet<Tuple1<Long>> iterationStep = iteration
>                               .coGroup(input2)
>                               .where(0)
>                               .equalTo(0)
>                               .with(new CoGroupFunction<Tuple1<Long>, 
> Tuple1<Long>, Tuple1<Long>>() {
>                                       @Override
>                                       public void coGroup(
>                                                       Iterable<Tuple1<Long>> 
> first,
>                                                       Iterable<Tuple1<Long>> 
> second,
>                                                       Collector<Tuple1<Long>> 
> out) throws Exception {
>                                               Iterator<Tuple1<Long>> it = 
> first.iterator();
>                                               if (it.hasNext()) {
>                                                       out.collect(it.next());
>                                               }
>                                       }
>                               });
>               DataSet<Tuple1<Long>> iterationResult = 
> iteration.closeWith(iterationStep);
>               iterationResult.output(new 
> DiscardingOutputFormat<Tuple1<Long>>());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to