http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java deleted file mode 100644 index 565d992..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; -import org.apache.flink.api.common.operators.GenericDataSourceBase; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.operators.base.BulkIterationBase; -import org.apache.flink.optimizer.costs.DefaultCostEstimator; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.util.OperatingSystem; -import org.apache.flink.util.Visitor; -import org.junit.Before; - -/** - * Base class for Optimizer tests. Offers utility methods to trigger optimization - * of a program and to fetch the nodes in an optimizer plan that correspond - * the the node in the program plan. - */ -public abstract class CompilerTestBase implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random"; - - protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null"; - - protected static final int DEFAULT_PARALLELISM = 8; - - private static final String CACHE_KEY = "cachekey"; - - // ------------------------------------------------------------------------ - - protected transient DataStatistics dataStats; - - protected transient Optimizer withStatsCompiler; - - protected transient Optimizer noStatsCompiler; - - private transient int statCounter; - - // ------------------------------------------------------------------------ - - @Before - public void setup() { - this.dataStats = new DataStatistics(); - this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator()); - this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); - - this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator()); - this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); - } - - // ------------------------------------------------------------------------ - - public OptimizedPlan compileWithStats(Plan p) { - return this.withStatsCompiler.compile(p); - } - - public OptimizedPlan compileNoStats(Plan p) { - return this.noStatsCompiler.compile(p); - } - - public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) { - setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth)); - } - - public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) { - final String key = CACHE_KEY + this.statCounter++; - this.dataStats.cacheBaseStatistics(stats, key); - source.setStatisticsKey(key); - } - - // ------------------------------------------------------------------------ - - public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) { - return new OptimizerPlanNodeResolver(plan); - } - - public static final class OptimizerPlanNodeResolver { - - private final Map<String, ArrayList<PlanNode>> map; - - OptimizerPlanNodeResolver(OptimizedPlan p) { - HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>(); - - for (PlanNode n : p.getAllNodes()) { - Operator<?> c = n.getOriginalOptimizerNode().getOperator(); - String name = c.getName(); - - ArrayList<PlanNode> list = map.get(name); - if (list == null) { - list = new ArrayList<PlanNode>(2); - map.put(name, list); - } - - // check whether this node is a child of a node with the same contract (aka combiner) - boolean shouldAdd = true; - for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) { - PlanNode in = iter.next(); - if (in.getOriginalOptimizerNode().getOperator() == c) { - // is this the child or is our node the child - if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) { - SingleInputPlanNode thisNode = (SingleInputPlanNode) n; - SingleInputPlanNode otherNode = (SingleInputPlanNode) in; - - if (thisNode.getPredecessor() == otherNode) { - // other node is child, remove it - iter.remove(); - } else if (otherNode.getPredecessor() == thisNode) { - shouldAdd = false; - } - } else { - throw new RuntimeException("Unrecodnized case in test."); - } - } - } - - if (shouldAdd) { - list.add(n); - } - } - - this.map = map; - } - - - @SuppressWarnings("unchecked") - public <T extends PlanNode> T getNode(String name) { - List<PlanNode> nodes = this.map.get(name); - if (nodes == null || nodes.isEmpty()) { - throw new RuntimeException("No node found with the given name."); - } else if (nodes.size() != 1) { - throw new RuntimeException("Multiple nodes found with the given name."); - } else { - return (T) nodes.get(0); - } - } - - @SuppressWarnings("unchecked") - public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) { - List<PlanNode> nodes = this.map.get(name); - if (nodes == null || nodes.isEmpty()) { - throw new RuntimeException("No node found with the given name and stub class."); - } else { - PlanNode found = null; - for (PlanNode node : nodes) { - if (node.getClass() == stubClass) { - if (found == null) { - found = node; - } else { - throw new RuntimeException("Multiple nodes found with the given name and stub class."); - } - } - } - if (found == null) { - throw new RuntimeException("No node found with the given name and stub class."); - } else { - return (T) found; - } - } - } - - public List<PlanNode> getNodes(String name) { - List<PlanNode> nodes = this.map.get(name); - if (nodes == null || nodes.isEmpty()) { - throw new RuntimeException("No node found with the given name."); - } else { - return new ArrayList<PlanNode>(nodes); - } - } - } - - /** - * Collects all DataSources of a plan to add statistics - * - */ - public static class SourceCollectorVisitor implements Visitor<Operator<?>> { - - protected final List<GenericDataSourceBase<?, ?>> sources = new ArrayList<GenericDataSourceBase<?, ?>>(4); - - @Override - public boolean preVisit(Operator<?> visitable) { - - if(visitable instanceof GenericDataSourceBase) { - sources.add((GenericDataSourceBase<?, ?>) visitable); - } - else if(visitable instanceof BulkIterationBase) { - ((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this); - } - - return true; - } - - @Override - public void postVisit(Operator<?> visitable) {} - - public List<GenericDataSourceBase<?, ?>> getSources() { - return this.sources; - } - - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java deleted file mode 100644 index b17e777..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.optimizer; - -import org.junit.Assert; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyMatchStub; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityMap; -import org.apache.flink.optimizer.util.IdentityReduce; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.types.IntValue; -import org.apache.flink.util.Visitor; -import org.junit.Test; - -/** - * Tests in this class: - * <ul> - * <li>Tests that check the correct handling of the properties and strategies in the case where the degree of - * parallelism between tasks is increased or decreased. - * </ul> - */ -@SuppressWarnings({"serial", "deprecation"}) -public class DOPChangeTest extends CompilerTestBase { - - /** - * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). - * - * Increases DOP between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable. - * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network - * transit as well. - */ - @Test - public void checkPropertyHandlingWithIncreasingGlobalParallelism1() { - final int degOfPar = DEFAULT_PARALLELISM; - - // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setDegreeOfParallelism(degOfPar); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setDegreeOfParallelism(degOfPar); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setDegreeOfParallelism(degOfPar); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setDegreeOfParallelism(degOfPar * 2); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setDegreeOfParallelism(degOfPar * 2); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setDegreeOfParallelism(degOfPar * 2); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); - - // submit the plan to the compiler - OptimizedPlan oPlan = compileNoStats(plan); - - // check the optimized Plan - // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, - // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same - // mapper respectively reducer - SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); - SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); - SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); - - ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); - ShipStrategyType redIn = red2Node.getInput().getShipStrategy(); - - Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn); - Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn); - } - - /** - * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). - * - * Increases DOP between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable. - * Expected to re-establish partitioning between map and reduce (hash). - */ - @Test - public void checkPropertyHandlingWithIncreasingGlobalParallelism2() { - final int degOfPar = DEFAULT_PARALLELISM; - - // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setDegreeOfParallelism(degOfPar); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setDegreeOfParallelism(degOfPar); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setDegreeOfParallelism(degOfPar); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setDegreeOfParallelism(degOfPar); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setDegreeOfParallelism(degOfPar * 2); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setDegreeOfParallelism(degOfPar * 2); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); - - // submit the plan to the compiler - OptimizedPlan oPlan = compileNoStats(plan); - - // check the optimized Plan - // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, - // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same - // mapper respectively reducer - SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); - SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); - SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); - - ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); - ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy(); - - Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn); - Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn); - } - - /** - * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). - * - * Increases DOP between 1st reduce and 2nd map, such that more tasks are on one instance. - * Expected to re-establish partitioning between map and reduce via a local hash. - */ - @Test - public void checkPropertyHandlingWithIncreasingLocalParallelism() { - final int degOfPar = 2 * DEFAULT_PARALLELISM; - - // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setDegreeOfParallelism(degOfPar); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setDegreeOfParallelism(degOfPar); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setDegreeOfParallelism(degOfPar); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setDegreeOfParallelism(degOfPar * 2); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setDegreeOfParallelism(degOfPar * 2); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setDegreeOfParallelism(degOfPar * 2); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); - - // submit the plan to the compiler - OptimizedPlan oPlan = compileNoStats(plan); - - // check the optimized Plan - // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, - // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same - // mapper respectively reducer - SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); - SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); - SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); - - ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); - ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy(); - - Assert.assertTrue("Invalid ship strategy for an operator.", - (ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) || - (ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn)); - } - - - - @Test - public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() { - final int degOfPar = DEFAULT_PARALLELISM; - - // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setDegreeOfParallelism(degOfPar * 2); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setDegreeOfParallelism(degOfPar * 2); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setDegreeOfParallelism(degOfPar * 2); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setDegreeOfParallelism(degOfPar); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setDegreeOfParallelism(degOfPar); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setDegreeOfParallelism(degOfPar); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); - - // submit the plan to the compiler - OptimizedPlan oPlan = compileNoStats(plan); - - // check the optimized Plan - // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, - // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same - // mapper respectively reducer - SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); - SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); - SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); - - Assert.assertTrue("The no sorting local strategy.", - LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() || - LocalStrategy.SORT == map2Node.getInput().getLocalStrategy()); - - Assert.assertTrue("The no partitioning ship strategy.", - ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() || - ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy()); - } - - /** - * Checks that re-partitioning happens when the inputs of a two-input contract have different DOPs. - * - * Test Plan: - * <pre> - * - * (source) -> reduce -\ - * Match -> (sink) - * (source) -> reduce -/ - * - * </pre> - * - */ - @Test - public void checkPropertyHandlingWithTwoInputs() { - // construct the plan - - FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE); - FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE); - - ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(sourceA) - .build(); - ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(sourceB) - .build(); - - JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(redA) - .input2(redB) - .build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat); - - sourceA.setDegreeOfParallelism(5); - sourceB.setDegreeOfParallelism(7); - redA.setDegreeOfParallelism(5); - redB.setDegreeOfParallelism(7); - - mat.setDegreeOfParallelism(5); - - sink.setDegreeOfParallelism(5); - - - // return the PACT plan - Plan plan = new Plan(sink, "Partition on DoP Change"); - - OptimizedPlan oPlan = compileNoStats(plan); - - JobGraphGenerator jobGen = new JobGraphGenerator(); - - //Compile plan to verify that no error is thrown - jobGen.compileJobGraph(oPlan); - - oPlan.accept(new Visitor<PlanNode>() { - - @Override - public boolean preVisit(PlanNode visitable) { - if (visitable instanceof DualInputPlanNode) { - DualInputPlanNode node = (DualInputPlanNode) visitable; - Channel c1 = node.getInput1(); - Channel c2 = node.getInput2(); - - Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy()); - Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy()); - return false; - } - return true; - } - - @Override - public void postVisit(PlanNode visitable) { - // DO NOTHING - } - }); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java deleted file mode 100644 index aaee975..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.junit.Test; - -@SuppressWarnings("serial") -public class DisjointDataFlowsTest extends CompilerTestBase { - - @Test - public void testDisjointFlows() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // generate two different flows - env.generateSequence(1, 10).print(); - env.generateSequence(1, 10).print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - new JobGraphGenerator().compileJobGraph(op); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java deleted file mode 100644 index 34aa9f8..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.optimizer; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.DistinctOperator; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plan.SourcePlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - - -@SuppressWarnings("serial") -public class DistinctCompilationTest extends CompilerTestBase implements java.io.Serializable { - - @Test - public void testDistinctPlain() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); - - DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) - .name("source").setParallelism(6); - - data - .distinct().name("reducer") - .print().name("sink"); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); - - // get the original nodes - SourcePlanNode sourceNode = resolver.getNode("source"); - SingleInputPlanNode reduceNode = resolver.getNode("reducer"); - SinkPlanNode sinkNode = resolver.getNode("sink"); - - // get the combiner - SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); - - // check wiring - assertEquals(sourceNode, combineNode.getInput().getSource()); - assertEquals(reduceNode, sinkNode.getInput().getSource()); - - // check that both reduce and combiner have the same strategy - assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy()); - assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy()); - - // check the keys - assertEquals(new FieldList(0, 1), reduceNode.getKeys(0)); - assertEquals(new FieldList(0, 1), combineNode.getKeys(0)); - assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys()); - - // check DOP - assertEquals(6, sourceNode.getParallelism()); - assertEquals(6, combineNode.getParallelism()); - assertEquals(8, reduceNode.getParallelism()); - assertEquals(8, sinkNode.getParallelism()); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); - } - } - - @Test - public void testDistinctWithSelectorFunctionKey() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); - - DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) - .name("source").setParallelism(6); - - data - .distinct(new KeySelector<Tuple2<String,Double>, String>() { - public String getKey(Tuple2<String, Double> value) { return value.f0; } - }).name("reducer") - .print().name("sink"); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); - - // get the original nodes - SourcePlanNode sourceNode = resolver.getNode("source"); - SingleInputPlanNode reduceNode = resolver.getNode("reducer"); - SinkPlanNode sinkNode = resolver.getNode("sink"); - - // get the combiner - SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); - - // get the key extractors and projectors - SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource(); - SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource(); - - // check wiring - assertEquals(sourceNode, keyExtractor.getInput().getSource()); - assertEquals(keyProjector, sinkNode.getInput().getSource()); - - // check that both reduce and combiner have the same strategy - assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy()); - assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy()); - - // check the keys - assertEquals(new FieldList(0), reduceNode.getKeys(0)); - assertEquals(new FieldList(0), combineNode.getKeys(0)); - assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); - - // check DOP - assertEquals(6, sourceNode.getParallelism()); - assertEquals(6, keyExtractor.getParallelism()); - assertEquals(6, combineNode.getParallelism()); - - assertEquals(8, reduceNode.getParallelism()); - assertEquals(8, keyProjector.getParallelism()); - assertEquals(8, sinkNode.getParallelism()); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); - } - } - - @Test - public void testDistinctWithFieldPositionKeyCombinable() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); - - DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) - .name("source").setParallelism(6); - - DistinctOperator<Tuple2<String, Double>> reduced = data - .distinct(1).name("reducer"); - - reduced.print().name("sink"); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); - - // get the original nodes - SourcePlanNode sourceNode = resolver.getNode("source"); - SingleInputPlanNode reduceNode = resolver.getNode("reducer"); - SinkPlanNode sinkNode = resolver.getNode("sink"); - - // get the combiner - SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); - - // check wiring - assertEquals(sourceNode, combineNode.getInput().getSource()); - assertEquals(reduceNode, sinkNode.getInput().getSource()); - - // check that both reduce and combiner have the same strategy - assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy()); - assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy()); - - // check the keys - assertEquals(new FieldList(1), reduceNode.getKeys(0)); - assertEquals(new FieldList(1), combineNode.getKeys(0)); - assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); - - // check DOP - assertEquals(6, sourceNode.getParallelism()); - assertEquals(6, combineNode.getParallelism()); - assertEquals(8, reduceNode.getParallelism()); - assertEquals(8, sinkNode.getParallelism()); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); - } - } -} \ No newline at end of file