http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java new file mode 100644 index 0000000..565d992 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; +import org.apache.flink.api.common.operators.GenericDataSourceBase; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.base.BulkIterationBase; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.Visitor; +import org.junit.Before; + +/** + * Base class for Optimizer tests. Offers utility methods to trigger optimization + * of a program and to fetch the nodes in an optimizer plan that correspond + * the the node in the program plan. + */ +public abstract class CompilerTestBase implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random"; + + protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null"; + + protected static final int DEFAULT_PARALLELISM = 8; + + private static final String CACHE_KEY = "cachekey"; + + // ------------------------------------------------------------------------ + + protected transient DataStatistics dataStats; + + protected transient Optimizer withStatsCompiler; + + protected transient Optimizer noStatsCompiler; + + private transient int statCounter; + + // ------------------------------------------------------------------------ + + @Before + public void setup() { + this.dataStats = new DataStatistics(); + this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator()); + this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); + + this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator()); + this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); + } + + // ------------------------------------------------------------------------ + + public OptimizedPlan compileWithStats(Plan p) { + return this.withStatsCompiler.compile(p); + } + + public OptimizedPlan compileNoStats(Plan p) { + return this.noStatsCompiler.compile(p); + } + + public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) { + setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth)); + } + + public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) { + final String key = CACHE_KEY + this.statCounter++; + this.dataStats.cacheBaseStatistics(stats, key); + source.setStatisticsKey(key); + } + + // ------------------------------------------------------------------------ + + public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) { + return new OptimizerPlanNodeResolver(plan); + } + + public static final class OptimizerPlanNodeResolver { + + private final Map<String, ArrayList<PlanNode>> map; + + OptimizerPlanNodeResolver(OptimizedPlan p) { + HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>(); + + for (PlanNode n : p.getAllNodes()) { + Operator<?> c = n.getOriginalOptimizerNode().getOperator(); + String name = c.getName(); + + ArrayList<PlanNode> list = map.get(name); + if (list == null) { + list = new ArrayList<PlanNode>(2); + map.put(name, list); + } + + // check whether this node is a child of a node with the same contract (aka combiner) + boolean shouldAdd = true; + for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) { + PlanNode in = iter.next(); + if (in.getOriginalOptimizerNode().getOperator() == c) { + // is this the child or is our node the child + if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) { + SingleInputPlanNode thisNode = (SingleInputPlanNode) n; + SingleInputPlanNode otherNode = (SingleInputPlanNode) in; + + if (thisNode.getPredecessor() == otherNode) { + // other node is child, remove it + iter.remove(); + } else if (otherNode.getPredecessor() == thisNode) { + shouldAdd = false; + } + } else { + throw new RuntimeException("Unrecodnized case in test."); + } + } + } + + if (shouldAdd) { + list.add(n); + } + } + + this.map = map; + } + + + @SuppressWarnings("unchecked") + public <T extends PlanNode> T getNode(String name) { + List<PlanNode> nodes = this.map.get(name); + if (nodes == null || nodes.isEmpty()) { + throw new RuntimeException("No node found with the given name."); + } else if (nodes.size() != 1) { + throw new RuntimeException("Multiple nodes found with the given name."); + } else { + return (T) nodes.get(0); + } + } + + @SuppressWarnings("unchecked") + public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) { + List<PlanNode> nodes = this.map.get(name); + if (nodes == null || nodes.isEmpty()) { + throw new RuntimeException("No node found with the given name and stub class."); + } else { + PlanNode found = null; + for (PlanNode node : nodes) { + if (node.getClass() == stubClass) { + if (found == null) { + found = node; + } else { + throw new RuntimeException("Multiple nodes found with the given name and stub class."); + } + } + } + if (found == null) { + throw new RuntimeException("No node found with the given name and stub class."); + } else { + return (T) found; + } + } + } + + public List<PlanNode> getNodes(String name) { + List<PlanNode> nodes = this.map.get(name); + if (nodes == null || nodes.isEmpty()) { + throw new RuntimeException("No node found with the given name."); + } else { + return new ArrayList<PlanNode>(nodes); + } + } + } + + /** + * Collects all DataSources of a plan to add statistics + * + */ + public static class SourceCollectorVisitor implements Visitor<Operator<?>> { + + protected final List<GenericDataSourceBase<?, ?>> sources = new ArrayList<GenericDataSourceBase<?, ?>>(4); + + @Override + public boolean preVisit(Operator<?> visitable) { + + if(visitable instanceof GenericDataSourceBase) { + sources.add((GenericDataSourceBase<?, ?>) visitable); + } + else if(visitable instanceof BulkIterationBase) { + ((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this); + } + + return true; + } + + @Override + public void postVisit(Operator<?> visitable) {} + + public List<GenericDataSourceBase<?, ?>> getSources() { + return this.sources; + } + + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java new file mode 100644 index 0000000..b17e777 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer; + +import org.junit.Assert; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.record.operators.FileDataSink; +import org.apache.flink.api.java.record.operators.FileDataSource; +import org.apache.flink.api.java.record.operators.JoinOperator; +import org.apache.flink.api.java.record.operators.MapOperator; +import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.util.DummyInputFormat; +import org.apache.flink.optimizer.util.DummyMatchStub; +import org.apache.flink.optimizer.util.DummyOutputFormat; +import org.apache.flink.optimizer.util.IdentityMap; +import org.apache.flink.optimizer.util.IdentityReduce; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.runtime.operators.util.LocalStrategy; +import org.apache.flink.types.IntValue; +import org.apache.flink.util.Visitor; +import org.junit.Test; + +/** + * Tests in this class: + * <ul> + * <li>Tests that check the correct handling of the properties and strategies in the case where the degree of + * parallelism between tasks is increased or decreased. + * </ul> + */ +@SuppressWarnings({"serial", "deprecation"}) +public class DOPChangeTest extends CompilerTestBase { + + /** + * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). + * + * Increases DOP between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable. + * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network + * transit as well. + */ + @Test + public void checkPropertyHandlingWithIncreasingGlobalParallelism1() { + final int degOfPar = DEFAULT_PARALLELISM; + + // construct the plan + FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); + source.setDegreeOfParallelism(degOfPar); + + MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); + map1.setDegreeOfParallelism(degOfPar); + map1.setInput(source); + + ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); + reduce1.setDegreeOfParallelism(degOfPar); + reduce1.setInput(map1); + + MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); + map2.setDegreeOfParallelism(degOfPar * 2); + map2.setInput(reduce1); + + ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); + reduce2.setDegreeOfParallelism(degOfPar * 2); + reduce2.setInput(map2); + + FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); + sink.setDegreeOfParallelism(degOfPar * 2); + sink.setInput(reduce2); + + Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, + // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same + // mapper respectively reducer + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); + SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); + + ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); + ShipStrategyType redIn = red2Node.getInput().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn); + } + + /** + * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). + * + * Increases DOP between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable. + * Expected to re-establish partitioning between map and reduce (hash). + */ + @Test + public void checkPropertyHandlingWithIncreasingGlobalParallelism2() { + final int degOfPar = DEFAULT_PARALLELISM; + + // construct the plan + FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); + source.setDegreeOfParallelism(degOfPar); + + MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); + map1.setDegreeOfParallelism(degOfPar); + map1.setInput(source); + + ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); + reduce1.setDegreeOfParallelism(degOfPar); + reduce1.setInput(map1); + + MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); + map2.setDegreeOfParallelism(degOfPar); + map2.setInput(reduce1); + + ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); + reduce2.setDegreeOfParallelism(degOfPar * 2); + reduce2.setInput(map2); + + FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); + sink.setDegreeOfParallelism(degOfPar * 2); + sink.setInput(reduce2); + + Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, + // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same + // mapper respectively reducer + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); + SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); + + ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); + ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn); + } + + /** + * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). + * + * Increases DOP between 1st reduce and 2nd map, such that more tasks are on one instance. + * Expected to re-establish partitioning between map and reduce via a local hash. + */ + @Test + public void checkPropertyHandlingWithIncreasingLocalParallelism() { + final int degOfPar = 2 * DEFAULT_PARALLELISM; + + // construct the plan + FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); + source.setDegreeOfParallelism(degOfPar); + + MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); + map1.setDegreeOfParallelism(degOfPar); + map1.setInput(source); + + ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); + reduce1.setDegreeOfParallelism(degOfPar); + reduce1.setInput(map1); + + MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); + map2.setDegreeOfParallelism(degOfPar * 2); + map2.setInput(reduce1); + + ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); + reduce2.setDegreeOfParallelism(degOfPar * 2); + reduce2.setInput(map2); + + FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); + sink.setDegreeOfParallelism(degOfPar * 2); + sink.setInput(reduce2); + + Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, + // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same + // mapper respectively reducer + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); + SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); + + ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); + ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy(); + + Assert.assertTrue("Invalid ship strategy for an operator.", + (ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) || + (ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn)); + } + + + + @Test + public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() { + final int degOfPar = DEFAULT_PARALLELISM; + + // construct the plan + FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); + source.setDegreeOfParallelism(degOfPar * 2); + + MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); + map1.setDegreeOfParallelism(degOfPar * 2); + map1.setInput(source); + + ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); + reduce1.setDegreeOfParallelism(degOfPar * 2); + reduce1.setInput(map1); + + MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); + map2.setDegreeOfParallelism(degOfPar); + map2.setInput(reduce1); + + ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); + reduce2.setDegreeOfParallelism(degOfPar); + reduce2.setInput(map2); + + FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); + sink.setDegreeOfParallelism(degOfPar); + sink.setInput(reduce2); + + Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, + // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same + // mapper respectively reducer + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); + SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); + + Assert.assertTrue("The no sorting local strategy.", + LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() || + LocalStrategy.SORT == map2Node.getInput().getLocalStrategy()); + + Assert.assertTrue("The no partitioning ship strategy.", + ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() || + ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy()); + } + + /** + * Checks that re-partitioning happens when the inputs of a two-input contract have different DOPs. + * + * Test Plan: + * <pre> + * + * (source) -> reduce -\ + * Match -> (sink) + * (source) -> reduce -/ + * + * </pre> + * + */ + @Test + public void checkPropertyHandlingWithTwoInputs() { + // construct the plan + + FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE); + FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE); + + ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) + .input(sourceA) + .build(); + ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) + .input(sourceB) + .build(); + + JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) + .input1(redA) + .input2(redB) + .build(); + + FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat); + + sourceA.setDegreeOfParallelism(5); + sourceB.setDegreeOfParallelism(7); + redA.setDegreeOfParallelism(5); + redB.setDegreeOfParallelism(7); + + mat.setDegreeOfParallelism(5); + + sink.setDegreeOfParallelism(5); + + + // return the PACT plan + Plan plan = new Plan(sink, "Partition on DoP Change"); + + OptimizedPlan oPlan = compileNoStats(plan); + + JobGraphGenerator jobGen = new JobGraphGenerator(); + + //Compile plan to verify that no error is thrown + jobGen.compileJobGraph(oPlan); + + oPlan.accept(new Visitor<PlanNode>() { + + @Override + public boolean preVisit(PlanNode visitable) { + if (visitable instanceof DualInputPlanNode) { + DualInputPlanNode node = (DualInputPlanNode) visitable; + Channel c1 = node.getInput1(); + Channel c2 = node.getInput2(); + + Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy()); + Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy()); + return false; + } + return true; + } + + @Override + public void postVisit(PlanNode visitable) { + // DO NOTHING + } + }); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java new file mode 100644 index 0000000..aaee975 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.junit.Test; + +@SuppressWarnings("serial") +public class DisjointDataFlowsTest extends CompilerTestBase { + + @Test + public void testDisjointFlows() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // generate two different flows + env.generateSequence(1, 10).print(); + env.generateSequence(1, 10).print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + new JobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java new file mode 100644 index 0000000..34aa9f8 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.DistinctOperator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.plan.SourcePlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + + +@SuppressWarnings("serial") +public class DistinctCompilationTest extends CompilerTestBase implements java.io.Serializable { + + @Test + public void testDistinctPlain() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + data + .distinct().name("reducer") + .print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // get the combiner + SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); + + // check wiring + assertEquals(sourceNode, combineNode.getInput().getSource()); + assertEquals(reduceNode, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy()); + assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(0, 1), reduceNode.getKeys(0)); + assertEquals(new FieldList(0, 1), combineNode.getKeys(0)); + assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys()); + + // check DOP + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } + + @Test + public void testDistinctWithSelectorFunctionKey() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + data + .distinct(new KeySelector<Tuple2<String,Double>, String>() { + public String getKey(Tuple2<String, Double> value) { return value.f0; } + }).name("reducer") + .print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // get the combiner + SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); + + // get the key extractors and projectors + SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource(); + SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource(); + + // check wiring + assertEquals(sourceNode, keyExtractor.getInput().getSource()); + assertEquals(keyProjector, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy()); + assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(0), reduceNode.getKeys(0)); + assertEquals(new FieldList(0), combineNode.getKeys(0)); + assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); + + // check DOP + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, keyExtractor.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, keyProjector.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } + + @Test + public void testDistinctWithFieldPositionKeyCombinable() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + DistinctOperator<Tuple2<String, Double>> reduced = data + .distinct(1).name("reducer"); + + reduced.print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // get the combiner + SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); + + // check wiring + assertEquals(sourceNode, combineNode.getInput().getSource()); + assertEquals(reduceNode, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy()); + assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(1), reduceNode.getKeys(0)); + assertEquals(new FieldList(1), combineNode.getKeys(0)); + assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); + + // check DOP + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } +} \ No newline at end of file