Repository: flink
Updated Branches:
  refs/heads/master 60ec68308 -> bd96ba8d1


[FLINK-1951] Fix NullPointerException in delta iteration due to missing temp

This closes #641


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adb321d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adb321d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adb321d6

Branch: refs/heads/master
Commit: adb321d61cc783b3a2a78f4e707104d75e1d63c0
Parents: 60ec683
Author: Fabian Hueske <fhue...@apache.org>
Authored: Thu Apr 30 17:34:02 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue May 5 22:56:36 2015 +0200

----------------------------------------------------------------------
 .../plantranslate/JobGraphGenerator.java        |  5 +-
 .../plantranslate/TempInIterationsTest.java     | 81 ++++++++++++++++++++
 2 files changed, 84 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/adb321d6/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index dc21c13..2630019 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -1163,8 +1163,9 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                        final TempMode tm = channel.getTempMode();
 
                        boolean needsMemory = false;
-                       // Don't add a pipeline breaker if the data exchange is 
already blocking.
-                       if (tm.breaksPipeline() && 
channel.getDataExchangeMode() != DataExchangeMode.BATCH) {
+                       // Don't add a pipeline breaker if the data exchange is 
already blocking, EXCEPT the channel is within an iteration.
+                       if (tm.breaksPipeline() &&
+                                       (channel.isOnDynamicPath() || 
channel.getDataExchangeMode() != DataExchangeMode.BATCH) ) {
                                
config.setInputAsynchronouslyMaterialized(inputNum, true);
                                needsMemory = true;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/adb321d6/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
new file mode 100644
index 0000000..15cb03f
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plantranslate;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TempInIterationsTest {
+
+       /*
+        * Tests whether temps barriers are correctly set in within iterations
+        */
+       @Test
+       public void testTempInIterationTest() throws Exception {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<Long, Long>> input = 
env.readCsvFile("file:///does/not/exist").types(Long.class, Long.class);
+
+               DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iteration =
+                               input.iterateDelta(input, 1, 0);
+
+               DataSet<Tuple2<Long, Long>> update = iteration.getWorkset()
+                               
.join(iteration.getSolutionSet()).where(0).equalTo(0)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+               iteration.closeWith(update, update)
+                               .output(new DiscardingOutputFormat<Tuple2<Long, 
Long>>());
+
+
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = (new Optimizer(new 
Configuration())).compile(plan);
+
+               JobGraphGenerator jgg = new JobGraphGenerator();
+               JobGraph jg = jgg.compileJobGraph(oPlan);
+
+               boolean solutionSetUpdateChecked = false;
+               for(AbstractJobVertex v : jg.getVertices()) {
+                       if(v.getName().equals("SolutionSet Delta")) {
+
+                               // check if input of solution set delta is 
temped
+                               TaskConfig tc = new 
TaskConfig(v.getConfiguration());
+                               
assertTrue(tc.isInputAsynchronouslyMaterialized(0));
+                               solutionSetUpdateChecked = true;
+                       }
+               }
+               assertTrue(solutionSetUpdateChecked);
+
+       }
+
+}

Reply via email to