Repository: flink Updated Branches: refs/heads/master 60ec68308 -> bd96ba8d1
[FLINK-1951] Fix NullPointerException in delta iteration due to missing temp This closes #641 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adb321d6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adb321d6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adb321d6 Branch: refs/heads/master Commit: adb321d61cc783b3a2a78f4e707104d75e1d63c0 Parents: 60ec683 Author: Fabian Hueske <fhue...@apache.org> Authored: Thu Apr 30 17:34:02 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue May 5 22:56:36 2015 +0200 ---------------------------------------------------------------------- .../plantranslate/JobGraphGenerator.java | 5 +- .../plantranslate/TempInIterationsTest.java | 81 ++++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/adb321d6/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index dc21c13..2630019 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -1163,8 +1163,9 @@ public class JobGraphGenerator implements Visitor<PlanNode> { final TempMode tm = channel.getTempMode(); boolean needsMemory = false; - // Don't add a pipeline breaker if the data exchange is already blocking. - if (tm.breaksPipeline() && channel.getDataExchangeMode() != DataExchangeMode.BATCH) { + // Don't add a pipeline breaker if the data exchange is already blocking, EXCEPT the channel is within an iteration. + if (tm.breaksPipeline() && + (channel.isOnDynamicPath() || channel.getDataExchangeMode() != DataExchangeMode.BATCH) ) { config.setInputAsynchronouslyMaterialized(inputNum, true); needsMemory = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/adb321d6/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java new file mode 100644 index 0000000..15cb03f --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plantranslate; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.operators.util.TaskConfig; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class TempInIterationsTest { + + /* + * Tests whether temps barriers are correctly set in within iterations + */ + @Test + public void testTempInIterationTest() throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> input = env.readCsvFile("file:///does/not/exist").types(Long.class, Long.class); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = + input.iterateDelta(input, 1, 0); + + DataSet<Tuple2<Long, Long>> update = iteration.getWorkset() + .join(iteration.getSolutionSet()).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + iteration.closeWith(update, update) + .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); + + + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = (new Optimizer(new Configuration())).compile(plan); + + JobGraphGenerator jgg = new JobGraphGenerator(); + JobGraph jg = jgg.compileJobGraph(oPlan); + + boolean solutionSetUpdateChecked = false; + for(AbstractJobVertex v : jg.getVertices()) { + if(v.getName().equals("SolutionSet Delta")) { + + // check if input of solution set delta is temped + TaskConfig tc = new TaskConfig(v.getConfiguration()); + assertTrue(tc.isInputAsynchronouslyMaterialized(0)); + solutionSetUpdateChecked = true; + } + } + assertTrue(solutionSetUpdateChecked); + + } + +}