[optimizer] Migrate first set of tests (branching plans) to new API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2db1812 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2db1812 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2db1812 Branch: refs/heads/master Commit: c2db18120c53dc8712b08369f7ea5b93ace98c6b Parents: a9150b3 Author: Stephan Ewen <se...@apache.org> Authored: Tue Mar 17 16:14:50 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Mar 20 10:21:14 2015 +0100 ---------------------------------------------------------------------- .../optimizer/BranchingPlansCompilerTest.java | 414 ++++++++----------- 1 file changed, 180 insertions(+), 234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c2db1812/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java index ff0e004..916aa27 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.optimizer; import static org.junit.Assert.*; @@ -26,6 +25,12 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; +import org.apache.flink.optimizer.testfunctions.SelectOneReducer; +import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; import org.apache.flink.api.common.Plan; @@ -73,39 +78,27 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { final int SINKS = 5; try { - List<FileDataSink> sinks = new ArrayList<FileDataSink>(); - - // construct the plan - final String out1Path = "file:///test/1"; - final String out2Path = "file:///test/2"; - - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE); - - MapOperator mapA = MapOperator.builder(IdentityMap.class).input(sourceA).name("Map A").build(); - MapOperator mapC = MapOperator.builder(IdentityMap.class).input(mapA).name("Map C").build(); - - FileDataSink[] sinkA = new FileDataSink[SINKS]; - FileDataSink[] sinkB = new FileDataSink[SINKS]; + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSet<Long> source = env.generateSequence(1, 10000); + + DataSet<Long> mappedA = source.map(new IdentityMapper<Long>()); + DataSet<Long> mappedC = source.map(new IdentityMapper<Long>()); + for (int sink = 0; sink < SINKS; sink++) { - sinkA[sink] = new FileDataSink(DummyOutputFormat.class, out1Path, mapA, "Sink A:" + sink); - sinks.add(sinkA[sink]); - - sinkB[sink] = new FileDataSink(DummyOutputFormat.class, out2Path, mapC, "Sink B:" + sink); - sinks.add(sinkB[sink]); + mappedA.output(new DiscardingOutputFormat<Long>()); + mappedC.output(new DiscardingOutputFormat<Long>()); } - - // return the PACT plan - Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks"); - + + Plan plan = env.createProgramPlan("Plans With Multiple Data Sinks"); OptimizedPlan oPlan = compileNoStats(plan); - - // ---------- compile plan to nephele job graph to verify that no error is thrown ---------- - - JobGraphGenerator jobGen = new JobGraphGenerator(); - jobGen.compileJobGraph(oPlan); - } catch (Exception e) { + + new JobGraphGenerator().compileJobGraph(oPlan); + } + catch (Exception e) { e.printStackTrace(); - Assert.fail(e.getMessage()); + fail(e.getMessage()); } } @@ -122,57 +115,44 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { * (SINK A) (SINK B) (SINK C) * </pre> */ + @SuppressWarnings("unchecked") @Test public void testBranchingWithMultipleDataSinks2() { try { - // construct the plan - final String out1Path = "file:///test/1"; - final String out2Path = "file:///test/2"; - final String out3Path = "file:///test/3"; - - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSet<Long> source = env.generateSequence(1, 10000); + + DataSet<Long> mappedA = source.map(new IdentityMapper<Long>()); + DataSet<Long> mappedB = mappedA.map(new IdentityMapper<Long>()); + DataSet<Long> mappedC = mappedA.map(new IdentityMapper<Long>()); + + mappedB.output(new DiscardingOutputFormat<Long>()); + mappedC.output(new DiscardingOutputFormat<Long>()); + mappedC.output(new DiscardingOutputFormat<Long>()); + + Plan plan = env.createProgramPlan(); + Set<Operator<?>> sinks = new HashSet<Operator<?>>(plan.getDataSinks()); - MapOperator mapA = MapOperator.builder(IdentityMap.class).input(sourceA).name("Map A").build(); - MapOperator mapB = MapOperator.builder(IdentityMap.class).input(mapA).name("Map B").build(); - MapOperator mapC = MapOperator.builder(IdentityMap.class).input(mapA).name("Map C").build(); - - FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, mapB, "Sink A"); - FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, mapC, "Sink B"); - FileDataSink sinkC = new FileDataSink(DummyOutputFormat.class, out3Path, mapC, "Sink C"); - - List<FileDataSink> sinks = new ArrayList<FileDataSink>(); - sinks.add(sinkA); - sinks.add(sinkB); - sinks.add(sinkC); - - // return the PACT plan - Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks"); - OptimizedPlan oPlan = compileNoStats(plan); - + // ---------- check the optimizer plan ---------- - + // number of sinks - Assert.assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size()); - - // sinks contain all sink paths - Set<String> allSinks = new HashSet<String>(); - allSinks.add(out1Path); - allSinks.add(out2Path); - allSinks.add(out3Path); - - for (SinkPlanNode n : oPlan.getDataSinks()) { - String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath(); - Assert.assertTrue("Invalid data sink.", allSinks.remove(path)); + assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size()); + + // remove matching sinks to check relation + for (SinkPlanNode sink : oPlan.getDataSinks()) { + assertTrue(sinks.remove(sink.getProgramOperator())); } - - // ---------- compile plan to nephele job graph to verify that no error is thrown ---------- - - JobGraphGenerator jobGen = new JobGraphGenerator(); - jobGen.compileJobGraph(oPlan); - } catch (Exception e) { + assertTrue(sinks.isEmpty()); + + new JobGraphGenerator().compileJobGraph(oPlan); + } + catch (Exception e) { e.printStackTrace(); - Assert.fail(e.getMessage()); + fail(e.getMessage()); } } @@ -203,72 +183,64 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { @Test public void testBranchingSourceMultipleTimes() { try { - // construct the plan - FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE); - - JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(sourceA) - .input2(sourceA) - .build(); - JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(sourceA) - .input2(mat1) - .build(); - JoinOperator mat3 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(sourceA) - .input2(mat2) - .build(); - JoinOperator mat4 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(sourceA) - .input2(mat3) - .build(); - JoinOperator mat5 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(sourceA) - .input2(mat4) - .build(); - - MapOperator ma = MapOperator.builder(new IdentityMap()).input(sourceA).build(); - - JoinOperator mat6 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(ma) - .input2(ma) - .build(); - JoinOperator mat7 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(ma) - .input2(mat6) - .build(); - JoinOperator mat8 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(ma) - .input2(mat7) - .build(); - JoinOperator mat9 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(ma) - .input2(mat8) - .build(); - JoinOperator mat10 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(ma) - .input2(mat9) - .build(); - - CoGroupOperator co = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0, 0) - .input1(mat5) - .input2(mat10) - .build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, co); - - // return the PACT plan - Plan plan = new Plan(sink, "Branching Source Multiple Times"); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000) + .map(new Duplicator<Long>()); + + DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> mapped = source.map( + new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Tuple2<Long, Long> value) { + return null; + } + }); + + DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + + joined5.coGroup(joined10) + .where(1).equalTo(1) + .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>()) + + .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()); + + Plan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); - - JobGraphGenerator jobGen = new JobGraphGenerator(); - - //Compile plan to verify that no error is thrown - jobGen.compileJobGraph(oPlan); - } catch (Exception e) { + new JobGraphGenerator().compileJobGraph(oPlan); + } + catch (Exception e) { e.printStackTrace(); - Assert.fail(e.getMessage()); + fail(e.getMessage()); } } @@ -294,73 +266,54 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { @Test public void testBranchingWithMultipleDataSinks() { try { - // construct the plan - final String out1Path = "file:///test/1"; - final String out2Path = "file:///test/2"; - final String out3Path = "file:///test/3"; - - FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE); - FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE); - FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), IN_FILE); - - CoGroupOperator co = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(sourceA) - .input2(sourceB) - .build(); - MapOperator ma = MapOperator.builder(new IdentityMap()).input(co).build(); - JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(sourceB) - .input2(sourceC) - .build(); - JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(ma) - .input2(mat1) - .build(); - ReduceOperator r = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(ma) - .build(); - CrossOperator c = CrossOperator.builder(new DummyCrossStub()) - .input1(r) - .input2(mat2) - .build(); - - FileDataSink sinkA = new FileDataSink(new DummyOutputFormat(), out1Path, c); - FileDataSink sinkB = new FileDataSink(new DummyOutputFormat(), out2Path, mat2); - FileDataSink sinkC = new FileDataSink(new DummyOutputFormat(), out3Path, mat2); - - List<FileDataSink> sinks = new ArrayList<FileDataSink>(); - sinks.add(sinkA); - sinks.add(sinkB); - sinks.add(sinkC); - - // return the PACT plan - Plan plan = new Plan(sinks, "Branching Plans With Multiple Data Sinks"); - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000) + .map(new Duplicator<Long>()); + + DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000) + .map(new Duplicator<Long>()); + + DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000) + .map(new Duplicator<Long>()); + + DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB) + .where(0).equalTo(1) + .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() { + @Override + public void coGroup(Iterable<Tuple2<Long, Long>> first, + Iterable<Tuple2<Long, Long>> second, + Collector<Tuple2<Long, Long>> out) { + } + }) + .map(new IdentityMapper<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC) + .where(0).equalTo(1) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined) + .where(1).equalTo(1) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> reduced = mapped + .groupBy(1) + .reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>()); + + reduced.cross(joined2) + .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()); + + joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); + joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); + + Plan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); - - // ---------- check the optimizer plan ---------- - - // number of sinks - Assert.assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size()); - - // sinks contain all sink paths - Set<String> allSinks = new HashSet<String>(); - allSinks.add(out1Path); - allSinks.add(out2Path); - allSinks.add(out3Path); - - for (SinkPlanNode n : oPlan.getDataSinks()) { - String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath(); - Assert.assertTrue("Invalid data sink.", allSinks.remove(path)); - } - - // ---------- compile plan to nephele job graph to verify that no error is thrown ---------- - - JobGraphGenerator jobGen = new JobGraphGenerator(); - jobGen.compileJobGraph(oPlan); - } catch (Exception e) { + new JobGraphGenerator().compileJobGraph(oPlan); + } + catch (Exception e) { e.printStackTrace(); - Assert.fail(e.getMessage()); + fail(e.getMessage()); } } @@ -860,48 +813,37 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { */ @Test public void testIterationWithStaticInput() { - FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source"); - - MapOperator mappedSource = MapOperator.builder(IdentityMap.class). - input(source). - name("Identity mapped source"). - build(); - - ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class). - input(source). - name("Identity reduce source"). - build(); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(mappedSource); - iteration.setMaximumNumberOfIterations(10); + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(100); - JoinOperator nextPartialSolution = JoinOperator.builder(DummyMatchStub.class, IntValue.class, 0,0). - input1(iteration.getPartialSolution()). - input2(reducedSource). - name("Next partial solution"). - build(); + DataSet<Long> source = env.generateSequence(1, 1000000); - iteration.setNextPartialSolution(nextPartialSolution); + DataSet<Long> mapped = source.map(new IdentityMapper<Long>()); - FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink"); - List<FileDataSink> sinks = new ArrayList<FileDataSink>(); - sinks.add(sink); + DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>()); - Plan plan = new Plan(sinks); + IterativeDataSet<Long> iteration = mapped.iterate(10); + iteration.closeWith( + iteration.join(reduced) + .where(new IdentityKeyExtractor<Long>()) + .equalTo(new IdentityKeyExtractor<Long>()) + .with(new DummyFlatJoinFunction<Long>())) + .output(new DiscardingOutputFormat<Long>()); - try{ - compileNoStats(plan); - }catch(Exception e){ + compileNoStats(env.createProgramPlan()); + } + catch(Exception e){ e.printStackTrace(); - Assert.fail(e.getMessage()); + fail(e.getMessage()); } } @Test public void testBranchingBroadcastVariable() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + env.setDegreeOfParallelism(100); + DataSet<String> input1 = env.readTextFile(IN_FILE).name("source1"); DataSet<String> input2 = env.readTextFile(IN_FILE).name("source2"); DataSet<String> input3 = env.readTextFile(IN_FILE).name("source3"); @@ -972,6 +914,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { @Test public void testMultipleIterations() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(100); DataSet<String> input = env.readTextFile(IN_FILE).name("source1"); @@ -1000,7 +943,8 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { @Test public void testMultipleIterationsWithClosueBCVars() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + env.setDegreeOfParallelism(100); + DataSet<String> input = env.readTextFile(IN_FILE).name("source1"); IterativeDataSet<String> iteration1 = input.iterate(100); @@ -1026,7 +970,8 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { public void testBranchesOnlyInBCVariables1() { try{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + env.setDegreeOfParallelism(100); + DataSet<Long> input = env.generateSequence(1, 10); DataSet<Long> bc_input = env.generateSequence(1, 10); @@ -1048,7 +993,8 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { public void testBranchesOnlyInBCVariables2() { try{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + env.setDegreeOfParallelism(100); + DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input"); DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");