[optimizer] Migrate first set of tests (branching plans) to new API

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2db1812
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2db1812
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2db1812

Branch: refs/heads/master
Commit: c2db18120c53dc8712b08369f7ea5b93ace98c6b
Parents: a9150b3
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 17 16:14:50 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 20 10:21:14 2015 +0100

----------------------------------------------------------------------
 .../optimizer/BranchingPlansCompilerTest.java   | 414 ++++++++-----------
 1 file changed, 180 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2db1812/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index ff0e004..916aa27 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
@@ -26,6 +25,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -73,39 +78,27 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
                final int SINKS = 5;
        
                try {
-                       List<FileDataSink> sinks = new 
ArrayList<FileDataSink>();
-       
-                       // construct the plan
-                       final String out1Path = "file:///test/1";
-                       final String out2Path = "file:///test/2";
-       
-                       FileDataSource sourceA = new 
FileDataSource(DummyInputFormat.class, IN_FILE);
-       
-                       MapOperator mapA = 
MapOperator.builder(IdentityMap.class).input(sourceA).name("Map A").build();
-                       MapOperator mapC = 
MapOperator.builder(IdentityMap.class).input(mapA).name("Map C").build();
-       
-                       FileDataSink[] sinkA = new FileDataSink[SINKS];
-                       FileDataSink[] sinkB = new FileDataSink[SINKS];
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+                       DataSet<Long> source = env.generateSequence(1, 10000);
+
+                       DataSet<Long> mappedA = source.map(new 
IdentityMapper<Long>());
+                       DataSet<Long> mappedC = source.map(new 
IdentityMapper<Long>());
+
                        for (int sink = 0; sink < SINKS; sink++) {
-                               sinkA[sink] = new 
FileDataSink(DummyOutputFormat.class, out1Path, mapA, "Sink A:" + sink);
-                               sinks.add(sinkA[sink]);
-       
-                               sinkB[sink] = new 
FileDataSink(DummyOutputFormat.class, out2Path, mapC, "Sink B:" + sink);
-                               sinks.add(sinkB[sink]);
+                               mappedA.output(new 
DiscardingOutputFormat<Long>());
+                               mappedC.output(new 
DiscardingOutputFormat<Long>());
                        }
-       
-                       // return the PACT plan
-                       Plan plan = new Plan(sinks, "Plans With Multiple Data 
Sinks");
-       
+
+                       Plan plan = env.createProgramPlan("Plans With Multiple 
Data Sinks");
                        OptimizedPlan oPlan = compileNoStats(plan);
-       
-                       // ---------- compile plan to nephele job graph to 
verify that no error is thrown ----------
-       
-                       JobGraphGenerator jobGen = new JobGraphGenerator();
-                       jobGen.compileJobGraph(oPlan);
-               } catch (Exception e) {
+
+                       new JobGraphGenerator().compileJobGraph(oPlan);
+               }
+               catch (Exception e) {
                        e.printStackTrace();
-                       Assert.fail(e.getMessage());
+                       fail(e.getMessage());
                }
        }
 
@@ -122,57 +115,44 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
         *        (SINK A)    (SINK B)  (SINK C)
         * </pre>
         */
+       @SuppressWarnings("unchecked")
        @Test
        public void testBranchingWithMultipleDataSinks2() {
                try {
-                       // construct the plan
-                       final String out1Path = "file:///test/1";
-                       final String out2Path = "file:///test/2";
-                       final String out3Path = "file:///test/3";
-       
-                       FileDataSource sourceA = new 
FileDataSource(DummyInputFormat.class, IN_FILE);
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+                       DataSet<Long> source = env.generateSequence(1, 10000);
+
+                       DataSet<Long> mappedA = source.map(new 
IdentityMapper<Long>());
+                       DataSet<Long> mappedB = mappedA.map(new 
IdentityMapper<Long>());
+                       DataSet<Long> mappedC = mappedA.map(new 
IdentityMapper<Long>());
+
+                       mappedB.output(new DiscardingOutputFormat<Long>());
+                       mappedC.output(new DiscardingOutputFormat<Long>());
+                       mappedC.output(new DiscardingOutputFormat<Long>());
+
+                       Plan plan = env.createProgramPlan();
+                       Set<Operator<?>> sinks = new 
HashSet<Operator<?>>(plan.getDataSinks());
 
-                       MapOperator mapA = 
MapOperator.builder(IdentityMap.class).input(sourceA).name("Map A").build();
-                       MapOperator mapB = 
MapOperator.builder(IdentityMap.class).input(mapA).name("Map B").build();
-                       MapOperator mapC = 
MapOperator.builder(IdentityMap.class).input(mapA).name("Map C").build();
-                       
-                       FileDataSink sinkA = new 
FileDataSink(DummyOutputFormat.class, out1Path, mapB, "Sink A");
-                       FileDataSink sinkB = new 
FileDataSink(DummyOutputFormat.class, out2Path, mapC, "Sink B");
-                       FileDataSink sinkC = new 
FileDataSink(DummyOutputFormat.class, out3Path, mapC, "Sink C");
-                       
-                       List<FileDataSink> sinks = new 
ArrayList<FileDataSink>();
-                       sinks.add(sinkA);
-                       sinks.add(sinkB);
-                       sinks.add(sinkC);
-                       
-                       // return the PACT plan
-                       Plan plan = new Plan(sinks, "Plans With Multiple Data 
Sinks");
-                       
                        OptimizedPlan oPlan = compileNoStats(plan);
-                       
+
                        // ---------- check the optimizer plan ----------
-                       
+
                        // number of sinks
-                       Assert.assertEquals("Wrong number of data sinks.", 3, 
oPlan.getDataSinks().size());
-                       
-                       // sinks contain all sink paths
-                       Set<String> allSinks = new HashSet<String>();
-                       allSinks.add(out1Path);
-                       allSinks.add(out2Path);
-                       allSinks.add(out3Path);
-                       
-                       for (SinkPlanNode n : oPlan.getDataSinks()) {
-                               String path = ((FileDataSink) 
n.getSinkNode().getOperator()).getFilePath();
-                               Assert.assertTrue("Invalid data sink.", 
allSinks.remove(path));
+                       assertEquals("Wrong number of data sinks.", 3, 
oPlan.getDataSinks().size());
+
+                       // remove matching sinks to check relation
+                       for (SinkPlanNode sink : oPlan.getDataSinks()) {
+                               
assertTrue(sinks.remove(sink.getProgramOperator()));
                        }
-                       
-                       // ---------- compile plan to nephele job graph to 
verify that no error is thrown ----------
-                       
-                       JobGraphGenerator jobGen = new JobGraphGenerator();
-                       jobGen.compileJobGraph(oPlan);
-               } catch (Exception e) {
+                       assertTrue(sinks.isEmpty());
+
+                       new JobGraphGenerator().compileJobGraph(oPlan);
+               }
+               catch (Exception e) {
                        e.printStackTrace();
-                       Assert.fail(e.getMessage());
+                       fail(e.getMessage());
                }
        }
        
@@ -203,72 +183,64 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
        @Test
        public void testBranchingSourceMultipleTimes() {
                try {
-                       // construct the plan
-                       FileDataSource sourceA = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
-                       
-                       JoinOperator mat1 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(sourceA)
-                               .input2(sourceA)
-                               .build();
-                       JoinOperator mat2 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(sourceA)
-                               .input2(mat1)
-                               .build();
-                       JoinOperator mat3 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(sourceA)
-                               .input2(mat2)
-                               .build();
-                       JoinOperator mat4 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(sourceA)
-                               .input2(mat3)
-                               .build();
-                       JoinOperator mat5 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(sourceA)
-                               .input2(mat4)
-                               .build();
-                       
-                       MapOperator ma = MapOperator.builder(new 
IdentityMap()).input(sourceA).build();
-                       
-                       JoinOperator mat6 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(ma)
-                               .input2(ma)
-                               .build();
-                       JoinOperator mat7 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(ma)
-                               .input2(mat6)
-                               .build();
-                       JoinOperator mat8 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(ma)
-                               .input2(mat7)
-                               .build();
-                       JoinOperator mat9 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(ma)
-                               .input2(mat8)
-                               .build();
-                       JoinOperator mat10 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(ma)
-                               .input2(mat9)
-                               .build();
-                       
-                       CoGroupOperator co = CoGroupOperator.builder(new 
DummyCoGroupStub(), IntValue.class, 0, 0)
-                               .input1(mat5)
-                               .input2(mat10)
-                               .build();
-       
-                       FileDataSink sink = new FileDataSink(new 
DummyOutputFormat(), OUT_FILE, co);
-                       
-                       // return the PACT plan
-                       Plan plan = new Plan(sink, "Branching Source Multiple 
Times");
-                       
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+                       DataSet<Tuple2<Long, Long>> source = 
env.generateSequence(1, 10000000)
+                               .map(new Duplicator<Long>());
+
+                       DataSet<Tuple2<Long, Long>> joined1 = 
source.join(source).where(0).equalTo(0)
+                                                                               
                                .with(new DummyFlatJoinFunction<Tuple2<Long, 
Long>>());
+
+                       DataSet<Tuple2<Long, Long>> joined2 = 
source.join(joined1).where(0).equalTo(0)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+                       DataSet<Tuple2<Long, Long>> joined3 = 
source.join(joined2).where(0).equalTo(0)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+                       DataSet<Tuple2<Long, Long>> joined4 = 
source.join(joined3).where(0).equalTo(0)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+                       DataSet<Tuple2<Long, Long>> joined5 = 
source.join(joined4).where(0).equalTo(0)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+                       DataSet<Tuple2<Long, Long>> mapped = source.map(
+                                       new MapFunction<Tuple2<Long, Long>, 
Tuple2<Long, Long>>() {
+                                               @Override
+                                               public Tuple2<Long, Long> 
map(Tuple2<Long, Long> value) {
+                                                       return null;
+                                               }
+                       });
+
+                       DataSet<Tuple2<Long, Long>> joined6 = 
mapped.join(mapped).where(0).equalTo(0)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+                       DataSet<Tuple2<Long, Long>> joined7 = 
mapped.join(joined6).where(0).equalTo(0)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+                       DataSet<Tuple2<Long, Long>> joined8 = 
mapped.join(joined7).where(0).equalTo(0)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+                       DataSet<Tuple2<Long, Long>> joined9 = 
mapped.join(joined8).where(0).equalTo(0)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+                       DataSet<Tuple2<Long, Long>> joined10 = 
mapped.join(joined9).where(0).equalTo(0)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+
+                       joined5.coGroup(joined10)
+                                       .where(1).equalTo(1)
+                                       .with(new 
DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+
+                               .output(new 
DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+                       Plan plan = env.createProgramPlan();
                        OptimizedPlan oPlan = compileNoStats(plan);
-                       
-                       JobGraphGenerator jobGen = new JobGraphGenerator();
-                       
-                       //Compile plan to verify that no error is thrown
-                       jobGen.compileJobGraph(oPlan);
-               } catch (Exception e) {
+                       new JobGraphGenerator().compileJobGraph(oPlan);
+               }
+               catch (Exception e) {
                        e.printStackTrace();
-                       Assert.fail(e.getMessage());
+                       fail(e.getMessage());
                }
        }
        
@@ -294,73 +266,54 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
        @Test
        public void testBranchingWithMultipleDataSinks() {
                try {
-                       // construct the plan
-                       final String out1Path = "file:///test/1";
-                       final String out2Path = "file:///test/2";
-                       final String out3Path = "file:///test/3";
-       
-                       FileDataSource sourceA = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
-                       FileDataSource sourceB = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
-                       FileDataSource sourceC = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
-                       
-                       CoGroupOperator co = CoGroupOperator.builder(new 
DummyCoGroupStub(), IntValue.class, 0,0)
-                               .input1(sourceA)
-                               .input2(sourceB)
-                               .build();
-                       MapOperator ma = MapOperator.builder(new 
IdentityMap()).input(co).build();
-                       JoinOperator mat1 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(sourceB)
-                               .input2(sourceC)
-                               .build();
-                       JoinOperator mat2 = JoinOperator.builder(new 
DummyMatchStub(), IntValue.class, 0, 0)
-                               .input1(ma)
-                               .input2(mat1)
-                               .build();
-                       ReduceOperator r = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
-                               .input(ma)
-                               .build();
-                       CrossOperator c = CrossOperator.builder(new 
DummyCrossStub())
-                               .input1(r)
-                               .input2(mat2)
-                               .build();
-                       
-                       FileDataSink sinkA = new FileDataSink(new 
DummyOutputFormat(), out1Path, c);
-                       FileDataSink sinkB = new FileDataSink(new 
DummyOutputFormat(), out2Path, mat2);
-                       FileDataSink sinkC = new FileDataSink(new 
DummyOutputFormat(), out3Path, mat2);
-                       
-                       List<FileDataSink> sinks = new 
ArrayList<FileDataSink>();
-                       sinks.add(sinkA);
-                       sinks.add(sinkB);
-                       sinks.add(sinkC);
-                       
-                       // return the PACT plan
-                       Plan plan = new Plan(sinks, "Branching Plans With 
Multiple Data Sinks");
-                       
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+                       DataSet<Tuple2<Long, Long>> sourceA = 
env.generateSequence(1, 10000000)
+                                       .map(new Duplicator<Long>());
+
+                       DataSet<Tuple2<Long, Long>> sourceB = 
env.generateSequence(1, 10000000)
+                                       .map(new Duplicator<Long>());
+
+                       DataSet<Tuple2<Long, Long>> sourceC = 
env.generateSequence(1, 10000000)
+                                       .map(new Duplicator<Long>());
+
+                       DataSet<Tuple2<Long, Long>> mapped = 
sourceA.coGroup(sourceB)
+                                       .where(0).equalTo(1)
+                                       .with(new CoGroupFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+                                                       @Override
+                                                       public void 
coGroup(Iterable<Tuple2<Long, Long>> first,
+                                                                               
                        Iterable<Tuple2<Long, Long>> second,
+                                                                               
                        Collector<Tuple2<Long, Long>> out) {
+                                                         }
+                                       })
+                                       .map(new IdentityMapper<Tuple2<Long, 
Long>>());
+
+                       DataSet<Tuple2<Long, Long>> joined = 
sourceB.join(sourceC)
+                                       .where(0).equalTo(1)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+                       DataSet<Tuple2<Long, Long>> joined2 = 
mapped.join(joined)
+                                       .where(1).equalTo(1)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+                       DataSet<Tuple2<Long, Long>> reduced = mapped
+                                       .groupBy(1)
+                                       .reduceGroup(new 
Top1GroupReducer<Tuple2<Long, Long>>());
+
+                       reduced.cross(joined2)
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+                       joined2.output(new DiscardingOutputFormat<Tuple2<Long, 
Long>>());
+                       joined2.output(new DiscardingOutputFormat<Tuple2<Long, 
Long>>());
+
+                       Plan plan = env.createProgramPlan();
                        OptimizedPlan oPlan = compileNoStats(plan);
-                       
-                       // ---------- check the optimizer plan ----------
-                       
-                       // number of sinks
-                       Assert.assertEquals("Wrong number of data sinks.", 3, 
oPlan.getDataSinks().size());
-                       
-                       // sinks contain all sink paths
-                       Set<String> allSinks = new HashSet<String>();
-                       allSinks.add(out1Path);
-                       allSinks.add(out2Path);
-                       allSinks.add(out3Path);
-                       
-                       for (SinkPlanNode n : oPlan.getDataSinks()) {
-                               String path = ((FileDataSink) 
n.getSinkNode().getOperator()).getFilePath();
-                               Assert.assertTrue("Invalid data sink.", 
allSinks.remove(path));
-                       }
-                       
-                       // ---------- compile plan to nephele job graph to 
verify that no error is thrown ----------
-                       
-                       JobGraphGenerator jobGen = new JobGraphGenerator();
-                       jobGen.compileJobGraph(oPlan);
-               } catch (Exception e) {
+                       new JobGraphGenerator().compileJobGraph(oPlan);
+               }
+               catch (Exception e) {
                        e.printStackTrace();
-                       Assert.fail(e.getMessage());
+                       fail(e.getMessage());
                }
        }
        
@@ -860,48 +813,37 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
         */
        @Test
        public void testIterationWithStaticInput() {
-               FileDataSource source = new 
FileDataSource(DummyInputFormat.class, IN_FILE, "source");
-
-               MapOperator mappedSource = 
MapOperator.builder(IdentityMap.class).
-                               input(source).
-                               name("Identity mapped source").
-                               build();
-
-               ReduceOperator reducedSource = 
ReduceOperator.builder(IdentityReduce.class).
-                               input(source).
-                               name("Identity reduce source").
-                               build();
-
-               BulkIteration iteration = new BulkIteration("Loop");
-               iteration.setInput(mappedSource);
-               iteration.setMaximumNumberOfIterations(10);
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(100);
 
-               JoinOperator nextPartialSolution = 
JoinOperator.builder(DummyMatchStub.class, IntValue.class, 0,0).
-                               input1(iteration.getPartialSolution()).
-                               input2(reducedSource).
-                               name("Next partial solution").
-                               build();
+                       DataSet<Long> source = env.generateSequence(1, 1000000);
 
-               iteration.setNextPartialSolution(nextPartialSolution);
+                       DataSet<Long> mapped = source.map(new 
IdentityMapper<Long>());
 
-               FileDataSink sink = new FileDataSink(DummyOutputFormat.class, 
OUT_FILE, iteration, "Iteration sink");
-               List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-               sinks.add(sink);
+                       DataSet<Long> reduced = source.groupBy(new 
IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
 
-               Plan plan = new Plan(sinks);
+                       IterativeDataSet<Long> iteration = mapped.iterate(10);
+                       iteration.closeWith(
+                                       iteration.join(reduced)
+                                                       .where(new 
IdentityKeyExtractor<Long>())
+                                                       .equalTo(new 
IdentityKeyExtractor<Long>())
+                                                       .with(new 
DummyFlatJoinFunction<Long>()))
+                                       .output(new 
DiscardingOutputFormat<Long>());
 
-               try{
-                       compileNoStats(plan);
-               }catch(Exception e){
+                       compileNoStats(env.createProgramPlan());
+               }
+               catch(Exception e){
                        e.printStackTrace();
-                       Assert.fail(e.getMessage());
+                       fail(e.getMessage());
                }
        }
        
        @Test
        public void testBranchingBroadcastVariable() {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
+               env.setDegreeOfParallelism(100);
+
                DataSet<String> input1 = 
env.readTextFile(IN_FILE).name("source1");
                DataSet<String> input2 = 
env.readTextFile(IN_FILE).name("source2");
                DataSet<String> input3 = 
env.readTextFile(IN_FILE).name("source3");
@@ -972,6 +914,7 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
        @Test
        public void testMultipleIterations() {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(100);
                
                DataSet<String> input = 
env.readTextFile(IN_FILE).name("source1");
                
@@ -1000,7 +943,8 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
        @Test
        public void testMultipleIterationsWithClosueBCVars() {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
+               env.setDegreeOfParallelism(100);
+
                DataSet<String> input = 
env.readTextFile(IN_FILE).name("source1");
                        
                IterativeDataSet<String> iteration1 = input.iterate(100);
@@ -1026,7 +970,8 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
        public void testBranchesOnlyInBCVariables1() {
                try{
                        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
+                       env.setDegreeOfParallelism(100);
+
                        DataSet<Long> input = env.generateSequence(1, 10);
                        DataSet<Long> bc_input = env.generateSequence(1, 10);
                        
@@ -1048,7 +993,8 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
        public void testBranchesOnlyInBCVariables2() {
                try{
                        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
+                       env.setDegreeOfParallelism(100);
+
                        DataSet<Tuple2<Long, Long>> input = 
env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
                        
                        DataSet<Long> bc_input1 = env.generateSequence(1, 
10).name("BC input 1");

Reply via email to