http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java new file mode 100644 index 0000000..5175d8c --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataexchange; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor; +import org.apache.flink.optimizer.testfunctions.Top1GroupReducer; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * This test verifies that the optimizer assigns the correct + * data exchange mode to a simple forward / shuffle plan. + * + * <pre> + * (source) -> (map) -> (filter) -> (groupBy / reduce) + * </pre> + */ +@SuppressWarnings("serial") +public class DataExchangeModeForwardTest extends CompilerTestBase { + + + @Test + public void testPipelinedForced() { + // PIPELINED_FORCED should result in pipelining all the way + verifySimpleForwardPlan(ExecutionMode.PIPELINED_FORCED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED); + } + + @Test + public void testPipelined() { + // PIPELINED should result in pipelining all the way + verifySimpleForwardPlan(ExecutionMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED); + } + + @Test + public void testBatch() { + // BATCH should result in batching the shuffle all the way + verifySimpleForwardPlan(ExecutionMode.BATCH, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.BATCH, DataExchangeMode.PIPELINED); + } + + @Test + public void testBatchForced() { + // BATCH_FORCED should result in batching all the way + verifySimpleForwardPlan(ExecutionMode.BATCH_FORCED, + DataExchangeMode.BATCH, DataExchangeMode.BATCH, + DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, + DataExchangeMode.BATCH, DataExchangeMode.BATCH); + } + + private void verifySimpleForwardPlan(ExecutionMode execMode, + DataExchangeMode toMap, + DataExchangeMode toFilter, + DataExchangeMode toKeyExtractor, + DataExchangeMode toCombiner, + DataExchangeMode toReduce, + DataExchangeMode toSink) + { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setExecutionMode(execMode); + + DataSet<String> dataSet = env.readTextFile("/never/accessed"); + dataSet + .map(new MapFunction<String, Integer>() { + @Override + public Integer map(String value) { + return 0; + } + }) + .filter(new FilterFunction<Integer>() { + @Override + public boolean filter(Integer value) { + return false; + } + }) + .groupBy(new IdentityKeyExtractor<Integer>()) + .reduceGroup(new Top1GroupReducer<Integer>()) + .output(new DiscardingOutputFormat<Integer>()); + + OptimizedPlan optPlan = compileNoStats(env.createProgramPlan()); + SinkPlanNode sinkNode = optPlan.getDataSinks().iterator().next(); + + SingleInputPlanNode reduceNode = (SingleInputPlanNode) sinkNode.getPredecessor(); + SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getPredecessor(); + SingleInputPlanNode keyExtractorNode = (SingleInputPlanNode) combineNode.getPredecessor(); + + SingleInputPlanNode filterNode = (SingleInputPlanNode) keyExtractorNode.getPredecessor(); + SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor(); + + assertEquals(toMap, mapNode.getInput().getDataExchangeMode()); + assertEquals(toFilter, filterNode.getInput().getDataExchangeMode()); + assertEquals(toKeyExtractor, keyExtractorNode.getInput().getDataExchangeMode()); + assertEquals(toCombiner, combineNode.getInput().getDataExchangeMode()); + assertEquals(toReduce, reduceNode.getInput().getDataExchangeMode()); + assertEquals(toSink, sinkNode.getInput().getDataExchangeMode()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java new file mode 100644 index 0000000..6b2691a --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataexchange; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.junit.Test; + +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * This test checks the correct assignment of the DataExchangeMode to + * connections for programs that branch, but do not re-join the branches. + * + * <pre> + * /---> (filter) -> (sink) + * / + * / + * (source) -> (map) -----------------\ + * \ (join) -> (sink) + * \ (source) --/ + * \ + * \ + * \-> (sink) + * </pre> + */ +@SuppressWarnings({"serial", "unchecked"}) +public class DataExchangeModeOpenBranchingTest extends CompilerTestBase { + + @Test + public void testPipelinedForced() { + // PIPELINED_FORCED should result in pipelining all the way + verifyBranchigPlan(ExecutionMode.PIPELINED_FORCED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED); + } + + @Test + public void testPipelined() { + // PIPELINED should result in pipelining all the way + verifyBranchigPlan(ExecutionMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED); + } + + @Test + public void testBatch() { + // BATCH should result in batching the shuffle all the way + verifyBranchigPlan(ExecutionMode.BATCH, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, + DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED); + } + + @Test + public void testBatchForced() { + // BATCH_FORCED should result in batching all the way + verifyBranchigPlan(ExecutionMode.BATCH_FORCED, + DataExchangeMode.BATCH, DataExchangeMode.BATCH, + DataExchangeMode.BATCH, DataExchangeMode.BATCH, + DataExchangeMode.BATCH, DataExchangeMode.BATCH, + DataExchangeMode.BATCH); + } + + private void verifyBranchigPlan(ExecutionMode execMode, + DataExchangeMode toMap, + DataExchangeMode toFilter, + DataExchangeMode toFilterSink, + DataExchangeMode toJoin1, + DataExchangeMode toJoin2, + DataExchangeMode toJoinSink, + DataExchangeMode toDirectSink) + { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setExecutionMode(execMode); + + DataSet<Tuple2<Long, Long>> data = env.generateSequence(1, 100000) + .map(new MapFunction<Long, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Long value) { + return new Tuple2<Long, Long>(value, value); + } + }); + + // output 1 + data + .filter(new FilterFunction<Tuple2<Long, Long>>() { + @Override + public boolean filter(Tuple2<Long, Long> value) { + return false; + } + }) + .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink1"); + + // output 2 does a join before a join + data + .join(env.fromElements(new Tuple2<Long, Long>(1L, 2L))) + .where(1) + .equalTo(0) + .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()).name("sink2"); + + // output 3 is direct + data + .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink3"); + + OptimizedPlan optPlan = compileNoStats(env.createProgramPlan()); + + SinkPlanNode filterSink = findSink(optPlan.getDataSinks(), "sink1"); + SinkPlanNode joinSink = findSink(optPlan.getDataSinks(), "sink2"); + SinkPlanNode directSink = findSink(optPlan.getDataSinks(), "sink3"); + + SingleInputPlanNode filterNode = (SingleInputPlanNode) filterSink.getPredecessor(); + SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor(); + + DualInputPlanNode joinNode = (DualInputPlanNode) joinSink.getPredecessor(); + assertEquals(mapNode, joinNode.getInput1().getSource()); + + assertEquals(mapNode, directSink.getPredecessor()); + + assertEquals(toFilterSink, filterSink.getInput().getDataExchangeMode()); + assertEquals(toJoinSink, joinSink.getInput().getDataExchangeMode()); + assertEquals(toDirectSink, directSink.getInput().getDataExchangeMode()); + + assertEquals(toMap, mapNode.getInput().getDataExchangeMode()); + assertEquals(toFilter, filterNode.getInput().getDataExchangeMode()); + + assertEquals(toJoin1, joinNode.getInput1().getDataExchangeMode()); + assertEquals(toJoin2, joinNode.getInput2().getDataExchangeMode()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) { + for (SinkPlanNode node : collection) { + String nodeName = node.getOptimizerNode().getOperator().getName(); + if (nodeName != null && nodeName.equals(name)) { + return node; + } + } + + throw new IllegalArgumentException("No node with that name was found."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java new file mode 100644 index 0000000..1a14be5 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataexchange; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.dag.DataSinkNode; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dag.SinkJoiner; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; +import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; +import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper; +import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor; +import org.apache.flink.optimizer.testfunctions.SelectOneReducer; +import org.apache.flink.optimizer.testfunctions.Top1GroupReducer; +import org.apache.flink.optimizer.traversals.BranchesVisitor; +import org.apache.flink.optimizer.traversals.GraphCreatingVisitor; +import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor; +import org.junit.Test; + +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * This test checks whether connections are correctly marked as pipelined breaking. + */ +@SuppressWarnings("serial") +public class PipelineBreakingTest { + + /** + * Tests that no pipeline breakers are inserted into a simple forward + * pipeline. + * + * <pre> + * (source) -> (map) -> (filter) -> (groupBy / reduce) + * </pre> + */ + @Test + public void testSimpleForwardPlan() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> dataSet = env.readTextFile("/never/accessed"); + dataSet + .map(new MapFunction<String, Integer>() { + @Override + public Integer map(String value) { + return 0; + } + }) + .filter(new FilterFunction<Integer>() { + @Override + public boolean filter(Integer value) { + return false; + } + }) + .groupBy(new IdentityKeyExtractor<Integer>()) + .reduceGroup(new Top1GroupReducer<Integer>()) + .output(new DiscardingOutputFormat<Integer>()); + + DataSinkNode sinkNode = convertPlan(env.createProgramPlan()).get(0); + + SingleInputNode reduceNode = (SingleInputNode) sinkNode.getPredecessorNode(); + SingleInputNode keyExtractorNode = (SingleInputNode) reduceNode.getPredecessorNode(); + + SingleInputNode filterNode = (SingleInputNode) keyExtractorNode.getPredecessorNode(); + SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode(); + + assertFalse(sinkNode.getInputConnection().isBreakingPipeline()); + assertFalse(reduceNode.getIncomingConnection().isBreakingPipeline()); + assertFalse(keyExtractorNode.getIncomingConnection().isBreakingPipeline()); + assertFalse(filterNode.getIncomingConnection().isBreakingPipeline()); + assertFalse(mapNode.getIncomingConnection().isBreakingPipeline()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * Tests that branching plans, where the branches are not re-joined, + * do not place pipeline breakers. + * + * <pre> + * /---> (filter) -> (sink) + * / + * / + * (source) -> (map) -----------------\ + * \ (join) -> (sink) + * \ (source) --/ + * \ + * \ + * \-> (sink) + * </pre> + */ + @Test + public void testBranchingPlanNotReJoined() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> data = env.readTextFile("/never/accessed") + .map(new MapFunction<String, Integer>() { + @Override + public Integer map(String value) { + return 0; + } + }); + + // output 1 + data + .filter(new FilterFunction<Integer>() { + @Override + public boolean filter(Integer value) { + return false; + } + }) + .output(new DiscardingOutputFormat<Integer>()); + + // output 2 does a join before a join + data + .join(env.fromElements(1, 2, 3, 4)) + .where(new IdentityKeyExtractor<Integer>()) + .equalTo(new IdentityKeyExtractor<Integer>()) + .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>()); + + // output 3 is direct + data + .output(new DiscardingOutputFormat<Integer>()); + + List<DataSinkNode> sinks = convertPlan(env.createProgramPlan()); + + // gather the optimizer DAG nodes + + DataSinkNode sinkAfterFilter = sinks.get(0); + DataSinkNode sinkAfterJoin = sinks.get(1); + DataSinkNode sinkDirect = sinks.get(2); + + SingleInputNode filterNode = (SingleInputNode) sinkAfterFilter.getPredecessorNode(); + SingleInputNode mapNode = (SingleInputNode) filterNode.getPredecessorNode(); + + TwoInputNode joinNode = (TwoInputNode) sinkAfterJoin.getPredecessorNode(); + SingleInputNode joinInput = (SingleInputNode) joinNode.getSecondPredecessorNode(); + + // verify the non-pipeline breaking status + + assertFalse(sinkAfterFilter.getInputConnection().isBreakingPipeline()); + assertFalse(sinkAfterJoin.getInputConnection().isBreakingPipeline()); + assertFalse(sinkDirect.getInputConnection().isBreakingPipeline()); + + assertFalse(filterNode.getIncomingConnection().isBreakingPipeline()); + assertFalse(mapNode.getIncomingConnection().isBreakingPipeline()); + + assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline()); + assertFalse(joinNode.getSecondIncomingConnection().isBreakingPipeline()); + assertFalse(joinInput.getIncomingConnection().isBreakingPipeline()); + + // some other sanity checks on the plan construction (cannot hurt) + + assertEquals(mapNode, ((SingleInputNode) joinNode.getFirstPredecessorNode()).getPredecessorNode()); + assertEquals(mapNode, sinkDirect.getPredecessorNode()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * Tests that branches that are re-joined have place pipeline breakers. + * + * <pre> + * /-> (sink) + * / + * /-> (reduce) -+ /-> (flatmap) -> (sink) + * / \ / + * (source) -> (map) - (join) -+-----\ + * \ / \ + * \-> (filter) -+ \ + * \ (co group) -> (sink) + * \ / + * \-> (reduce) - / + * </pre> + */ + @Test + public void testReJoinedBranches() { + try { + // build a test program + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> data = env.fromElements(33L, 44L) + .map(new MapFunction<Long, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Long value) { + return new Tuple2<Long, Long>(value, value); + } + }); + + DataSet<Tuple2<Long, Long>> reduced = data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>()); + reduced.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> filtered = data.filter(new FilterFunction<Tuple2<Long, Long>>() { + @Override + public boolean filter(Tuple2<Long, Long> value) throws Exception { + return false; + } + }); + + DataSet<Tuple2<Long, Long>> joined = reduced.join(filtered) + .where(1).equalTo(1) + .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); + + joined.flatMap(new IdentityFlatMapper<Tuple2<Long, Long>>()) + .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); + + joined.coGroup(filtered.groupBy(1).reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>())) + .where(0).equalTo(0) + .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>()) + .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()); + + List<DataSinkNode> sinks = convertPlan(env.createProgramPlan()); + + // gather the optimizer DAG nodes + + DataSinkNode sinkAfterReduce = sinks.get(0); + DataSinkNode sinkAfterFlatMap = sinks.get(1); + DataSinkNode sinkAfterCoGroup = sinks.get(2); + + SingleInputNode reduceNode = (SingleInputNode) sinkAfterReduce.getPredecessorNode(); + SingleInputNode mapNode = (SingleInputNode) reduceNode.getPredecessorNode(); + + SingleInputNode flatMapNode = (SingleInputNode) sinkAfterFlatMap.getPredecessorNode(); + TwoInputNode joinNode = (TwoInputNode) flatMapNode.getPredecessorNode(); + SingleInputNode filterNode = (SingleInputNode) joinNode.getSecondPredecessorNode(); + + TwoInputNode coGroupNode = (TwoInputNode) sinkAfterCoGroup.getPredecessorNode(); + SingleInputNode otherReduceNode = (SingleInputNode) coGroupNode.getSecondPredecessorNode(); + + // test sanity checks (that we constructed the DAG correctly) + + assertEquals(reduceNode, joinNode.getFirstPredecessorNode()); + assertEquals(mapNode, filterNode.getPredecessorNode()); + assertEquals(joinNode, coGroupNode.getFirstPredecessorNode()); + assertEquals(filterNode, otherReduceNode.getPredecessorNode()); + + // verify the pipeline breaking status + + assertFalse(sinkAfterReduce.getInputConnection().isBreakingPipeline()); + assertFalse(sinkAfterFlatMap.getInputConnection().isBreakingPipeline()); + assertFalse(sinkAfterCoGroup.getInputConnection().isBreakingPipeline()); + + assertFalse(mapNode.getIncomingConnection().isBreakingPipeline()); + assertFalse(flatMapNode.getIncomingConnection().isBreakingPipeline()); + assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline()); + assertFalse(coGroupNode.getFirstIncomingConnection().isBreakingPipeline()); + assertFalse(coGroupNode.getSecondIncomingConnection().isBreakingPipeline()); + + // these should be pipeline breakers + assertTrue(reduceNode.getIncomingConnection().isBreakingPipeline()); + assertTrue(filterNode.getIncomingConnection().isBreakingPipeline()); + assertTrue(otherReduceNode.getIncomingConnection().isBreakingPipeline()); + assertTrue(joinNode.getSecondIncomingConnection().isBreakingPipeline()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static List<DataSinkNode> convertPlan(Plan p) { + GraphCreatingVisitor dagCreator = + new GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode()); + + // create the DAG + p.accept(dagCreator); + List<DataSinkNode> sinks = dagCreator.getSinks(); + + // build a single root and run the branch tracking logic + OptimizerNode rootNode; + if (sinks.size() == 1) { + rootNode = sinks.get(0); + } + else { + Iterator<DataSinkNode> iter = sinks.iterator(); + rootNode = iter.next(); + + while (iter.hasNext()) { + rootNode = new SinkJoiner(rootNode, iter.next()); + } + } + rootNode.accept(new IdAndEstimatesVisitor(null)); + rootNode.accept(new BranchesVisitor()); + + return sinks; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java new file mode 100644 index 0000000..3e32905 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Set; + +public class GlobalPropertiesFilteringTest { + + private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = + new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + @Test + public void testAllErased1() { + + SingleInputSemanticProperties semProps = new SingleInputSemanticProperties(); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1)); + gprops.addUniqueFieldCombination(new FieldSet(3, 4)); + gprops.addUniqueFieldCombination(new FieldSet(5, 6)); + + GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0); + + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + assertNull(result.getPartitioningOrdering()); + assertNull(result.getUniqueFieldCombination()); + } + + @Test + public void testAllErased2() { + + SingleInputSemanticProperties semProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(semProps, new String[]{"2"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1)); + gprops.addUniqueFieldCombination(new FieldSet(3, 4)); + gprops.addUniqueFieldCombination(new FieldSet(5, 6)); + + GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0); + + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + assertNull(result.getPartitioningOrdering()); + assertNull(result.getUniqueFieldCombination()); + } + + @Test + public void testHashPartitioningPreserved1() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertTrue(pFields.contains(0)); + assertTrue(pFields.contains(1)); + assertTrue(pFields.contains(4)); + } + + @Test + public void testHashPartitioningPreserved2() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.HASH_PARTITIONED, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertTrue(pFields.contains(1)); + assertTrue(pFields.contains(2)); + assertTrue(pFields.contains(3)); + } + + @Test + public void testHashPartitioningErased() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + } + + @Test + public void testAnyPartitioningPreserved1() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setAnyPartitioning(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertTrue(pFields.contains(0)); + assertTrue(pFields.contains(1)); + assertTrue(pFields.contains(4)); + } + + @Test + public void testAnyPartitioningPreserved2() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setAnyPartitioning(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.ANY_PARTITIONING, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertTrue(pFields.contains(1)); + assertTrue(pFields.contains(2)); + assertTrue(pFields.contains(3)); + } + + @Test + public void testAnyPartitioningErased() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setAnyPartitioning(new FieldList(0, 1, 4)); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + } + + @Test + public void testCustomPartitioningPreserved1() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner(); + gprops.setCustomPartitioned(new FieldList(0, 4), myP); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(2, pFields.size()); + assertTrue(pFields.contains(0)); + assertTrue(pFields.contains(4)); + assertEquals(myP, result.getCustomPartitioner()); + } + + @Test + public void testCustomPartitioningPreserved2() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner(); + gprops.setCustomPartitioned(new FieldList(0, 4), myP); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(2, pFields.size()); + assertTrue(pFields.contains(1)); + assertTrue(pFields.contains(3)); + assertEquals(myP, result.getCustomPartitioner()); + } + + @Test + public void testCustomPartitioningErased() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner(); + gprops.setCustomPartitioned(new FieldList(0, 4), myP); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + assertNull(result.getCustomPartitioner()); + } + + @Test + public void testRangePartitioningPreserved1() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;2;5"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + o.appendOrdering(2, StringValue.class, Order.ASCENDING); + GlobalProperties gprops = new GlobalProperties(); + gprops.setRangePartitioned(o); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertEquals(1, pFields.get(0).intValue()); + assertEquals(5, pFields.get(1).intValue()); + assertEquals(2, pFields.get(2).intValue()); + Ordering pOrder = result.getPartitioningOrdering(); + assertEquals(3, pOrder.getNumberOfFields()); + assertEquals(1, pOrder.getFieldNumber(0).intValue()); + assertEquals(5, pOrder.getFieldNumber(1).intValue()); + assertEquals(2, pOrder.getFieldNumber(2).intValue()); + assertEquals(Order.ASCENDING, pOrder.getOrder(0)); + assertEquals(Order.DESCENDING, pOrder.getOrder(1)); + assertEquals(Order.ASCENDING, pOrder.getOrder(2)); + assertEquals(IntValue.class, pOrder.getType(0)); + assertEquals(LongValue.class, pOrder.getType(1)); + assertEquals(StringValue.class, pOrder.getType(2)); + } + + @Test + public void testRangePartitioningPreserved2() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1->3; 2->0; 5->1"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + o.appendOrdering(2, StringValue.class, Order.ASCENDING); + GlobalProperties gprops = new GlobalProperties(); + gprops.setRangePartitioned(o); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANGE_PARTITIONED, result.getPartitioning()); + FieldList pFields = result.getPartitioningFields(); + assertEquals(3, pFields.size()); + assertEquals(3, pFields.get(0).intValue()); + assertEquals(1, pFields.get(1).intValue()); + assertEquals(0, pFields.get(2).intValue()); + Ordering pOrder = result.getPartitioningOrdering(); + assertEquals(3, pOrder.getNumberOfFields()); + assertEquals(3, pOrder.getFieldNumber(0).intValue()); + assertEquals(1, pOrder.getFieldNumber(1).intValue()); + assertEquals(0, pOrder.getFieldNumber(2).intValue()); + assertEquals(Order.ASCENDING, pOrder.getOrder(0)); + assertEquals(Order.DESCENDING, pOrder.getOrder(1)); + assertEquals(Order.ASCENDING, pOrder.getOrder(2)); + assertEquals(IntValue.class, pOrder.getType(0)); + assertEquals(LongValue.class, pOrder.getType(1)); + assertEquals(StringValue.class, pOrder.getType(2)); + } + + @Test + public void testRangePartitioningErased() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"1;5"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + o.appendOrdering(2, StringValue.class, Order.ASCENDING); + GlobalProperties gprops = new GlobalProperties(); + gprops.setRangePartitioned(o); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); + assertNull(result.getPartitioningOrdering()); + assertNull(result.getPartitioningFields()); + } + + @Test + public void testRebalancingPreserved() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setForcedRebalanced(); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + + assertEquals(PartitioningProperty.FORCED_REBALANCED, result.getPartitioning()); + assertNull(result.getPartitioningFields()); + } + + @Test + public void testUniqueFieldGroupsPreserved1() { + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo); + + FieldSet set1 = new FieldSet(0, 1, 2); + FieldSet set2 = new FieldSet(3, 4); + FieldSet set3 = new FieldSet(4, 5, 6, 7); + GlobalProperties gprops = new GlobalProperties(); + gprops.addUniqueFieldCombination(set1); + gprops.addUniqueFieldCombination(set2); + gprops.addUniqueFieldCombination(set3); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + Set<FieldSet> unique = result.getUniqueFieldCombination(); + FieldSet expected1 = new FieldSet(0, 1, 2); + FieldSet expected2 = new FieldSet(3, 4); + + Assert.assertTrue(unique.size() == 2); + Assert.assertTrue(unique.contains(expected1)); + Assert.assertTrue(unique.contains(expected2)); + } + + @Test + public void testUniqueFieldGroupsPreserved2() { + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0->5;1;2;3->6;4"}, null, null, tupleInfo, tupleInfo); + + FieldSet set1 = new FieldSet(0, 1, 2); + FieldSet set2 = new FieldSet(3, 4); + FieldSet set3 = new FieldSet(4, 5, 6, 7); + GlobalProperties gprops = new GlobalProperties(); + gprops.addUniqueFieldCombination(set1); + gprops.addUniqueFieldCombination(set2); + gprops.addUniqueFieldCombination(set3); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + Set<FieldSet> unique = result.getUniqueFieldCombination(); + FieldSet expected1 = new FieldSet(1, 2, 5); + FieldSet expected2 = new FieldSet(4, 6); + + Assert.assertTrue(unique.size() == 2); + Assert.assertTrue(unique.contains(expected1)); + Assert.assertTrue(unique.contains(expected2)); + } + + @Test + public void testUniqueFieldGroupsErased() { + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0; 3; 5; 6; 7"}, null, null, tupleInfo, tupleInfo); + + FieldSet set1 = new FieldSet(0, 1, 2); + FieldSet set2 = new FieldSet(3, 4); + FieldSet set3 = new FieldSet(4, 5, 6, 7); + GlobalProperties gprops = new GlobalProperties(); + gprops.addUniqueFieldCombination(set1); + gprops.addUniqueFieldCombination(set2); + gprops.addUniqueFieldCombination(set3); + + GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); + Assert.assertNull(result.getUniqueFieldCombination()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testInvalidInputIndex() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + GlobalProperties gprops = new GlobalProperties(); + gprops.setHashPartitioned(new FieldList(0, 1)); + + gprops.filterBySemanticProperties(sprops, 1); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java new file mode 100644 index 0000000..52826d6 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Test; + +public class GlobalPropertiesMatchingTest { + + @Test + public void testMatchingAnyPartitioning() { + try { + + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setAnyPartitioning(new FieldSet(6, 2)); + + // match any partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setAnyPartitioning(new FieldList(2, 6)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setAnyPartitioning(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setAnyPartitioning(new FieldList(6, 2, 4)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setAnyPartitioning(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp4)); + + GlobalProperties gp5 = new GlobalProperties(); + gp5.setAnyPartitioning(new FieldList(2)); + assertTrue(req.isMetBy(gp5)); + } + + // match hash partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setHashPartitioned(new FieldList(2, 6)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setHashPartitioned(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setHashPartitioned(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + } + + // match range partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); + assertTrue(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); + assertTrue(req.isMetBy(gp4)); + } + + // match custom partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setCustomPartitioned(new FieldList(2, 6), new MockPartitioner()); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner()); + assertTrue(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setCustomPartitioned(new FieldList(6, 1), new MockPartitioner()); + assertFalse(req.isMetBy(gp3)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMatchingCustomPartitioning() { + try { + final Partitioner<Tuple2<Long, Integer>> partitioner = new MockPartitioner(); + + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setCustomPartitioned(new FieldSet(6, 2), partitioner); + + // match custom partitionings + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setCustomPartitioned(new FieldList(2, 6), partitioner); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setCustomPartitioned(new FieldList(6, 2), partitioner); + assertTrue(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner()); + assertFalse(req.isMetBy(gp3)); + } + + // cannot match other types of partitionings + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setAnyPartitioning(new FieldList(6, 2)); + assertFalse(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setHashPartitioned(new FieldList(6, 2)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp3)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testStrictlyMatchingAnyPartitioning() { + + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setAnyPartitioning(new FieldList(6, 2)); + + // match any partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setAnyPartitioning(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setAnyPartitioning(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setAnyPartitioning(new FieldList(6, 2, 3)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp3.setAnyPartitioning(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp5 = new GlobalProperties(); + gp4.setAnyPartitioning(new FieldList(2)); + assertFalse(req.isMetBy(gp4)); + } + + // match hash partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setHashPartitioned(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setHashPartitioned(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setHashPartitioned(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + } + + // match range partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); + assertFalse(req.isMetBy(gp4)); + } + + } + + @Test + public void testStrictlyMatchingHashPartitioning() { + + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setHashPartitioned(new FieldList(6, 2)); + + // match any partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setAnyPartitioning(new FieldList(6, 2)); + assertFalse(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setAnyPartitioning(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setAnyPartitioning(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setAnyPartitioning(new FieldList(2)); + assertFalse(req.isMetBy(gp4)); + } + + // match hash partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setHashPartitioned(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setHashPartitioned(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setHashPartitioned(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setHashPartitioned(new FieldList(6, 2, 0)); + assertFalse(req.isMetBy(gp4)); + } + + // match range partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); + assertFalse(req.isMetBy(gp4)); + } + + } + + // -------------------------------------------------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java new file mode 100644 index 0000000..0868720 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.junit.Test; + +public class GlobalPropertiesPushdownTest { + + @Test + public void testAnyPartitioningPushedDown() { + try { + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setAnyPartitioning(new FieldSet(3, 1)); + + RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0); + assertEquals(PartitioningProperty.ANY_PARTITIONING, preserved.getPartitioning()); + assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3))); + + RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0); + assertTrue(nonPreserved == null || nonPreserved.isTrivial()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testHashPartitioningPushedDown() { + try { + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setHashPartitioned(new FieldSet(3, 1)); + + RequestedGlobalProperties preserved = req.filterBySemanticProperties(getAllPreservingSemProps(), 0); + assertEquals(PartitioningProperty.HASH_PARTITIONED, preserved.getPartitioning()); + assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3))); + + RequestedGlobalProperties nonPreserved = req.filterBySemanticProperties(getNonePreservingSemProps(), 0); + assertTrue(nonPreserved == null || nonPreserved.isTrivial()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningNotPushedDown() { + try { + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setCustomPartitioned(new FieldSet(3, 1), new MockPartitioner()); + + RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0); + assertTrue(pushedDown == null || pushedDown.isTrivial()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testForcedReblancingNotPushedDown() { + try { + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setForceRebalancing(); + + RequestedGlobalProperties pushedDown = req.filterBySemanticProperties(getAllPreservingSemProps(), 0); + assertTrue(pushedDown == null || pushedDown.isTrivial()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + private static SemanticProperties getAllPreservingSemProps() { + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); + } + + private static SemanticProperties getNonePreservingSemProps() { + return new SingleInputSemanticProperties(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java new file mode 100644 index 0000000..1ff62ed --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; +import org.junit.Test; + +public class LocalPropertiesFilteringTest { + + private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = + new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + @Test + public void testAllErased1() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + lProps = lProps.addUniqueFields(new FieldSet(5,6)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testAllErased2() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"5"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testGroupingPreserved1() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNotNull(filtered.getGroupedFields()); + assertEquals(3, filtered.getGroupedFields().size()); + assertTrue(filtered.getGroupedFields().contains(0)); + assertTrue(filtered.getGroupedFields().contains(2)); + assertTrue(filtered.getGroupedFields().contains(3)); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testGroupingPreserved2() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0;3->7"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNotNull(filtered.getGroupedFields()); + assertEquals(3, filtered.getGroupedFields().size()); + assertTrue(filtered.getGroupedFields().contains(4)); + assertTrue(filtered.getGroupedFields().contains(0)); + assertTrue(filtered.getGroupedFields().contains(7)); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testGroupingErased() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testSortingPreserved1() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;5"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(2, IntValue.class, Order.ASCENDING); + o.appendOrdering(0, StringValue.class, Order.DESCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + LocalProperties lProps = LocalProperties.forOrdering(o); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldList gFields = filtered.getGroupedFields(); + Ordering order = filtered.getOrdering(); + + assertNotNull(gFields); + assertEquals(3, gFields.size()); + assertTrue(gFields.contains(0)); + assertTrue(gFields.contains(2)); + assertTrue(gFields.contains(5)); + assertNotNull(order); + assertEquals(3, order.getNumberOfFields()); + assertEquals(2, order.getFieldNumber(0).intValue()); + assertEquals(0, order.getFieldNumber(1).intValue()); + assertEquals(5, order.getFieldNumber(2).intValue()); + assertEquals(Order.ASCENDING, order.getOrder(0)); + assertEquals(Order.DESCENDING, order.getOrder(1)); + assertEquals(Order.DESCENDING, order.getOrder(2)); + assertEquals(IntValue.class, order.getType(0)); + assertEquals(StringValue.class, order.getType(1)); + assertEquals(LongValue.class, order.getType(2)); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testSortingPreserved2() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->3;2->7;5->1"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(2, IntValue.class, Order.ASCENDING); + o.appendOrdering(0, StringValue.class, Order.DESCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + LocalProperties lProps = LocalProperties.forOrdering(o); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldList gFields = filtered.getGroupedFields(); + Ordering order = filtered.getOrdering(); + + assertNotNull(gFields); + assertEquals(3, gFields.size()); + assertTrue(gFields.contains(3)); + assertTrue(gFields.contains(7)); + assertTrue(gFields.contains(1)); + assertNotNull(order); + assertEquals(3, order.getNumberOfFields()); + assertEquals(7, order.getFieldNumber(0).intValue()); + assertEquals(3, order.getFieldNumber(1).intValue()); + assertEquals(1, order.getFieldNumber(2).intValue()); + assertEquals(Order.ASCENDING, order.getOrder(0)); + assertEquals(Order.DESCENDING, order.getOrder(1)); + assertEquals(Order.DESCENDING, order.getOrder(2)); + assertEquals(IntValue.class, order.getType(0)); + assertEquals(StringValue.class, order.getType(1)); + assertEquals(LongValue.class, order.getType(2)); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testSortingPreserved3() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(2, IntValue.class, Order.ASCENDING); + o.appendOrdering(0, StringValue.class, Order.DESCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + LocalProperties lProps = LocalProperties.forOrdering(o); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldList gFields = filtered.getGroupedFields(); + Ordering order = filtered.getOrdering(); + + assertNotNull(gFields); + assertEquals(2, gFields.size()); + assertTrue(gFields.contains(0)); + assertTrue(gFields.contains(2)); + assertNotNull(order); + assertEquals(2, order.getNumberOfFields()); + assertEquals(2, order.getFieldNumber(0).intValue()); + assertEquals(0, order.getFieldNumber(1).intValue()); + assertEquals(Order.ASCENDING, order.getOrder(0)); + assertEquals(Order.DESCENDING, order.getOrder(1)); + assertEquals(IntValue.class, order.getType(0)); + assertEquals(StringValue.class, order.getType(1)); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testSortingPreserved4() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"2->7;5"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(2, IntValue.class, Order.ASCENDING); + o.appendOrdering(0, StringValue.class, Order.DESCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + LocalProperties lProps = LocalProperties.forOrdering(o); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldList gFields = filtered.getGroupedFields(); + Ordering order = filtered.getOrdering(); + + assertNotNull(gFields); + assertEquals(1, gFields.size()); + assertTrue(gFields.contains(7)); + assertNotNull(order); + assertEquals(1, order.getNumberOfFields()); + assertEquals(7, order.getFieldNumber(0).intValue()); + assertEquals(Order.ASCENDING, order.getOrder(0)); + assertEquals(IntValue.class, order.getType(0)); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testSortingErased() { + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;5"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(2, IntValue.class, Order.ASCENDING); + o.appendOrdering(0, StringValue.class, Order.DESCENDING); + o.appendOrdering(5, LongValue.class, Order.DESCENDING); + LocalProperties lProps = LocalProperties.forOrdering(o); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldList gFields = filtered.getGroupedFields(); + Ordering order = filtered.getOrdering(); + + assertNull(gFields); + assertNull(order); + assertNull(filtered.getUniqueFields()); + } + + @Test + public void testUniqueFieldsPreserved1() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = new LocalProperties(); + lProps = lProps.addUniqueFields(new FieldSet(0,1,2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + lProps = lProps.addUniqueFields(new FieldSet(4,5,6)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldSet expected1 = new FieldSet(0,1,2); + FieldSet expected2 = new FieldSet(3,4); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNotNull(filtered.getUniqueFields()); + assertEquals(2, filtered.getUniqueFields().size()); + assertTrue(filtered.getUniqueFields().contains(expected1)); + assertTrue(filtered.getUniqueFields().contains(expected2)); + } + + @Test + public void testUniqueFieldsPreserved2() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = LocalProperties.forGrouping(new FieldList(1,2)); + lProps = lProps.addUniqueFields(new FieldSet(0,1,2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + lProps = lProps.addUniqueFields(new FieldSet(4,5,6)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldSet expected1 = new FieldSet(0,1,2); + FieldSet expected2 = new FieldSet(3,4); + + assertNull(filtered.getOrdering()); + assertNotNull(filtered.getGroupedFields()); + assertEquals(2, filtered.getGroupedFields().size()); + assertTrue(filtered.getGroupedFields().contains(1)); + assertTrue(filtered.getGroupedFields().contains(2)); + assertNotNull(filtered.getUniqueFields()); + assertEquals(2, filtered.getUniqueFields().size()); + assertTrue(filtered.getUniqueFields().contains(expected1)); + assertTrue(filtered.getUniqueFields().contains(expected2)); + } + + @Test + public void testUniqueFieldsPreserved3() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->7;1->6;2->5;3->4;4->3"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = new LocalProperties(); + lProps = lProps.addUniqueFields(new FieldSet(0,1,2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + lProps = lProps.addUniqueFields(new FieldSet(4,5,6)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + FieldSet expected1 = new FieldSet(5,6,7); + FieldSet expected2 = new FieldSet(3,4); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNotNull(filtered.getUniqueFields()); + assertEquals(2, filtered.getUniqueFields().size()); + assertTrue(filtered.getUniqueFields().contains(expected1)); + assertTrue(filtered.getUniqueFields().contains(expected2)); + } + + @Test + public void testUniqueFieldsErased() { + + SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lProps = new LocalProperties(); + lProps = lProps.addUniqueFields(new FieldSet(0,1,2)); + lProps = lProps.addUniqueFields(new FieldSet(3,4)); + lProps = lProps.addUniqueFields(new FieldSet(4,5,6)); + + LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0); + + assertNull(filtered.getGroupedFields()); + assertNull(filtered.getOrdering()); + assertNull(filtered.getUniqueFields()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testInvalidInputIndex() { + + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + LocalProperties lprops = LocalProperties.forGrouping(new FieldList(0,1)); + + lprops.filterBySemanticProperties(sprops, 1); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java new file mode 100644 index 0000000..74126f8 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.Key; + +import java.io.IOException; + +@SuppressWarnings("serial") +public class MockDistribution implements DataDistribution { + + @Override + public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) { + return new Key<?>[0]; + } + + @Override + public int getNumberOfFields() { + return 0; + } + + @Override + public void write(DataOutputView out) throws IOException { + + } + + @Override + public void read(DataInputView in) throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java new file mode 100644 index 0000000..2b2ab14 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.tuple.Tuple2; + +class MockPartitioner implements Partitioner<Tuple2<Long, Integer>> { + + private static final long serialVersionUID = 1L; + + @Override + public int partition(Tuple2<Long, Integer> key, int numPartitions) { + return 0; + } +} \ No newline at end of file