http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java deleted file mode 100644 index 0273659..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.custompartition; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.junit.Test; - -@SuppressWarnings({"serial","unchecked"}) -public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase { - - @Test - public void testCompatiblePartitioningJoin() { - try { - final Partitioner<Long> partitioner = new Partitioner<Long>() { - @Override - public int partition(Long key, int numPartitions) { - return 0; - } - }; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); - DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); - - input1.partitionCustom(partitioner, 1) - .join(input2.partitionCustom(partitioner, 0)) - .where(1).equalTo(0) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode partitioner1 = (SingleInputPlanNode) join.getInput1().getSource(); - SingleInputPlanNode partitioner2 = (SingleInputPlanNode) join.getInput2().getSource(); - - assertEquals(ShipStrategyType.FORWARD, join.getInput1().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, join.getInput2().getShipStrategy()); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner1.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner2.getInput().getShipStrategy()); - assertEquals(partitioner, partitioner1.getInput().getPartitioner()); - assertEquals(partitioner, partitioner2.getInput().getPartitioner()); - - new JobGraphGenerator().compileJobGraph(op); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCompatiblePartitioningCoGroup() { - try { - final Partitioner<Long> partitioner = new Partitioner<Long>() { - @Override - public int partition(Long key, int numPartitions) { - return 0; - } - }; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); - DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); - - input1.partitionCustom(partitioner, 1) - .coGroup(input2.partitionCustom(partitioner, 0)) - .where(1).equalTo(0) - .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode partitioner1 = (SingleInputPlanNode) coGroup.getInput1().getSource(); - SingleInputPlanNode partitioner2 = (SingleInputPlanNode) coGroup.getInput2().getSource(); - - assertEquals(ShipStrategyType.FORWARD, coGroup.getInput1().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, coGroup.getInput2().getShipStrategy()); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner1.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner2.getInput().getShipStrategy()); - assertEquals(partitioner, partitioner1.getInput().getPartitioner()); - assertEquals(partitioner, partitioner2.getInput().getPartitioner()); - - new JobGraphGenerator().compileJobGraph(op); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java deleted file mode 100644 index 08f7388..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.custompartition; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; -import org.apache.flink.optimizer.testfunctions.IdentityMapper; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.junit.Test; - -@SuppressWarnings({"serial", "unchecked"}) -public class CoGroupCustomPartitioningTest extends CompilerTestBase { - - @Test - public void testCoGroupWithTuples() { - try { - final Partitioner<Long> partitioner = new TestPartitionerLong(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); - DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); - - input1 - .coGroup(input2) - .where(1).equalTo(0) - .withPartitioner(partitioner) - .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); - assertEquals(partitioner, join.getInput1().getPartitioner()); - assertEquals(partitioner, join.getInput2().getPartitioner()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCoGroupWithTuplesWrongType() { - try { - final Partitioner<Integer> partitioner = new TestPartitionerInt(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); - DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); - - try { - input1 - .coGroup(input2) - .where(1).equalTo(0) - .withPartitioner(partitioner); - fail("should throw an exception"); - } - catch (InvalidProgramException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCoGroupWithPojos() { - try { - final Partitioner<Integer> partitioner = new TestPartitionerInt(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> input1 = env.fromElements(new Pojo2()); - DataSet<Pojo3> input2 = env.fromElements(new Pojo3()); - - input1 - .coGroup(input2) - .where("b").equalTo("a") - .withPartitioner(partitioner) - .with(new DummyCoGroupFunction<Pojo2, Pojo3>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); - assertEquals(partitioner, join.getInput1().getPartitioner()); - assertEquals(partitioner, join.getInput2().getPartitioner()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCoGroupWithPojosWrongType() { - try { - final Partitioner<Long> partitioner = new TestPartitionerLong(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> input1 = env.fromElements(new Pojo2()); - DataSet<Pojo3> input2 = env.fromElements(new Pojo3()); - - try { - input1 - .coGroup(input2) - .where("a").equalTo("b") - .withPartitioner(partitioner); - - fail("should throw an exception"); - } - catch (InvalidProgramException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCoGroupWithKeySelectors() { - try { - final Partitioner<Integer> partitioner = new TestPartitionerInt(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> input1 = env.fromElements(new Pojo2()); - DataSet<Pojo3> input2 = env.fromElements(new Pojo3()); - - input1 - .coGroup(input2) - .where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector()) - .withPartitioner(partitioner) - .with(new DummyCoGroupFunction<Pojo2, Pojo3>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); - assertEquals(partitioner, join.getInput1().getPartitioner()); - assertEquals(partitioner, join.getInput2().getPartitioner()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCoGroupWithKeySelectorsWrongType() { - try { - final Partitioner<Long> partitioner = new TestPartitionerLong(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> input1 = env.fromElements(new Pojo2()); - DataSet<Pojo3> input2 = env.fromElements(new Pojo3()); - - try { - input1 - .coGroup(input2) - .where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector()) - .withPartitioner(partitioner); - - fail("should throw an exception"); - } - catch (InvalidProgramException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testIncompatibleHashAndCustomPartitioning() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); - - DataSet<Tuple3<Long, Long, Long>> partitioned = input - .partitionCustom(new Partitioner<Long>() { - @Override - public int partition(Long key, int numPartitions) { return 0; } - }, 0) - .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2"); - - - DataSet<Tuple3<Long, Long, Long>> grouped = partitioned - .distinct(0, 1) - .groupBy(1) - .sortGroup(0, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1"); - - grouped - .coGroup(partitioned).where(0).equalTo(0) - .with(new DummyCoGroupFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy()); - assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || - coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - private static class TestPartitionerInt implements Partitioner<Integer> { - @Override - public int partition(Integer key, int numPartitions) { - return 0; - } - } - - private static class TestPartitionerLong implements Partitioner<Long> { - @Override - public int partition(Long key, int numPartitions) { - return 0; - } - } - - public static class Pojo2 { - public int a; - public int b; - } - - public static class Pojo3 { - public int a; - public int b; - public int c; - } - - private static class Pojo2KeySelector implements KeySelector<Pojo2, Integer> { - @Override - public Integer getKey(Pojo2 value) { - return value.a; - } - } - - private static class Pojo3KeySelector implements KeySelector<Pojo3, Integer> { - @Override - public Integer getKey(Pojo3 value) { - return value.b; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java deleted file mode 100644 index 9fd676f..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.custompartition; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.junit.Test; - -@SuppressWarnings({"serial", "unchecked"}) -public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase { - - @Test - public void testJoinReduceCombination() { - try { - final Partitioner<Long> partitioner = new TestPartitionerLong(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); - DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); - - DataSet<Tuple3<Long, Long, Long>> joined = input1.join(input2) - .where(1).equalTo(0) - .projectFirst(0, 1) - .<Tuple3<Long, Long, Long>>projectSecond(2) - .withPartitioner(partitioner); - - joined.groupBy(1).withPartitioner(partitioner) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - - assertTrue("Reduce is not chained, property reuse does not happen", - reducer.getInput().getSource() instanceof DualInputPlanNode); - - DualInputPlanNode join = (DualInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); - assertEquals(partitioner, join.getInput1().getPartitioner()); - assertEquals(partitioner, join.getInput2().getPartitioner()); - - assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - private static class TestPartitionerLong implements Partitioner<Long> { - @Override - public int partition(Long key, int numPartitions) { - return 0; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java deleted file mode 100644 index d397ea2..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.custompartition; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.IdentityPartitionerMapper; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.junit.Test; - -@SuppressWarnings({"serial", "unchecked"}) -public class CustomPartitioningTest extends CompilerTestBase { - - @Test - public void testPartitionTuples() { - try { - final Partitioner<Integer> part = new TestPartitionerInt(); - final int parallelism = 4; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); - - DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer,Integer>(0, 0)) - .rebalance(); - - data - .partitionCustom(part, 0) - .mapPartition(new IdentityPartitionerMapper<Tuple2<Integer,Integer>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode partitioner = (SingleInputPlanNode) mapper.getInput().getSource(); - SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(parallelism, sink.getParallelism()); - - assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy()); - assertEquals(parallelism, mapper.getParallelism()); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy()); - assertEquals(part, partitioner.getInput().getPartitioner()); - assertEquals(parallelism, partitioner.getParallelism()); - - assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy()); - assertEquals(parallelism, balancer.getParallelism()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testPartitionTuplesInvalidType() { - try { - final int parallelism = 4; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); - - DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer,Integer>(0, 0)) - .rebalance(); - - try { - data - .partitionCustom(new TestPartitionerLong(), 0); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testPartitionPojo() { - try { - final Partitioner<Integer> part = new TestPartitionerInt(); - final int parallelism = 4; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); - - DataSet<Pojo> data = env.fromElements(new Pojo()) - .rebalance(); - - data - .partitionCustom(part, "a") - .mapPartition(new IdentityPartitionerMapper<Pojo>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode partitioner = (SingleInputPlanNode) mapper.getInput().getSource(); - SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(parallelism, sink.getParallelism()); - - assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy()); - assertEquals(parallelism, mapper.getParallelism()); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy()); - assertEquals(part, partitioner.getInput().getPartitioner()); - assertEquals(parallelism, partitioner.getParallelism()); - - assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy()); - assertEquals(parallelism, balancer.getParallelism()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testPartitionPojoInvalidType() { - try { - final int parallelism = 4; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); - - DataSet<Pojo> data = env.fromElements(new Pojo()) - .rebalance(); - - try { - data - .partitionCustom(new TestPartitionerLong(), "a"); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testPartitionKeySelector() { - try { - final Partitioner<Integer> part = new TestPartitionerInt(); - final int parallelism = 4; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); - - DataSet<Pojo> data = env.fromElements(new Pojo()) - .rebalance(); - - data - .partitionCustom(part, new TestKeySelectorInt<Pojo>()) - .mapPartition(new IdentityPartitionerMapper<Pojo>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode keyRemover = (SingleInputPlanNode) mapper.getInput().getSource(); - SingleInputPlanNode partitioner = (SingleInputPlanNode) keyRemover.getInput().getSource(); - SingleInputPlanNode keyExtractor = (SingleInputPlanNode) partitioner.getInput().getSource(); - SingleInputPlanNode balancer = (SingleInputPlanNode) keyExtractor.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(parallelism, sink.getParallelism()); - - assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy()); - assertEquals(parallelism, mapper.getParallelism()); - - assertEquals(ShipStrategyType.FORWARD, keyRemover.getInput().getShipStrategy()); - assertEquals(parallelism, keyRemover.getParallelism()); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy()); - assertEquals(part, partitioner.getInput().getPartitioner()); - assertEquals(parallelism, partitioner.getParallelism()); - - assertEquals(ShipStrategyType.FORWARD, keyExtractor.getInput().getShipStrategy()); - assertEquals(parallelism, keyExtractor.getParallelism()); - - assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy()); - assertEquals(parallelism, balancer.getParallelism()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testPartitionKeySelectorInvalidType() { - try { - final Partitioner<Integer> part = (Partitioner<Integer>) (Partitioner<?>) new TestPartitionerLong(); - final int parallelism = 4; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); - - DataSet<Pojo> data = env.fromElements(new Pojo()) - .rebalance(); - - try { - data - .partitionCustom(part, new TestKeySelectorInt<Pojo>()); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - public static class Pojo { - public int a; - public int b; - } - - private static class TestPartitionerInt implements Partitioner<Integer> { - @Override - public int partition(Integer key, int numPartitions) { - return 0; - } - } - - private static class TestPartitionerLong implements Partitioner<Long> { - @Override - public int partition(Long key, int numPartitions) { - return 0; - } - } - - private static class TestKeySelectorInt<T> implements KeySelector<T, Integer> { - @Override - public Integer getKey(T value) { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java deleted file mode 100644 index 360487b..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.custompartition; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.DummyReducer; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.junit.Test; - -@SuppressWarnings({"serial", "unchecked"}) -public class GroupingKeySelectorTranslationTest extends CompilerTestBase { - - @Test - public void testCustomPartitioningKeySelectorReduce() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0)) - .rebalance().setParallelism(4); - - data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>()) - .withPartitioner(new TestPartitionerInt()) - .reduce(new DummyReducer<Tuple2<Integer,Integer>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode keyRemovingMapper = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode reducer = (SingleInputPlanNode) keyRemovingMapper.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, keyRemovingMapper.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningKeySelectorGroupReduce() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0)) - .rebalance().setParallelism(4); - - data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>()) - .withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningKeySelectorGroupReduceSorted() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0)) - .rebalance().setParallelism(4); - - data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>()) - .withPartitioner(new TestPartitionerInt()) - .sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningKeySelectorInvalidType() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0)) - .rebalance().setParallelism(4); - - try { - data - .groupBy(new TestKeySelector<Tuple2<Integer,Integer>>()) - .withPartitioner(new TestPartitionerLong()); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) {} - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningKeySelectorInvalidTypeSorted() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0)) - .rebalance().setParallelism(4); - - try { - data - .groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>()) - .sortGroup(1, Order.ASCENDING) - .withPartitioner(new TestPartitionerLong()); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) {} - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleRejectCompositeKey() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0)) - .rebalance().setParallelism(4); - - try { - data - .groupBy(new TestBinaryKeySelector<Tuple3<Integer,Integer,Integer>>()) - .withPartitioner(new TestPartitionerInt()); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) {} - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - private static class TestPartitionerInt implements Partitioner<Integer> { - @Override - public int partition(Integer key, int numPartitions) { - return 0; - } - } - - private static class TestPartitionerLong implements Partitioner<Long> { - @Override - public int partition(Long key, int numPartitions) { - return 0; - } - } - - private static class TestKeySelector<T extends Tuple> implements KeySelector<T, Integer> { - @Override - public Integer getKey(T value) { - return value.getField(0); - } - } - - private static class TestBinaryKeySelector<T extends Tuple> implements KeySelector<T, Tuple2<Integer, Integer>> { - @Override - public Tuple2<Integer, Integer> getKey(T value) { - return new Tuple2<Integer, Integer>(value.<Integer>getField(0), value.<Integer>getField(1)); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java deleted file mode 100644 index 8cd4809..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.custompartition; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.DummyReducer; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.junit.Test; - -@SuppressWarnings("serial") -public class GroupingPojoTranslationTest extends CompilerTestBase { - - @Test - public void testCustomPartitioningTupleReduce() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> data = env.fromElements(new Pojo2()) - .rebalance().setParallelism(4); - - data.groupBy("a").withPartitioner(new TestPartitionerInt()) - .reduce(new DummyReducer<Pojo2>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleGroupReduce() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> data = env.fromElements(new Pojo2()) - .rebalance().setParallelism(4); - - data.groupBy("a").withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducer<Pojo2>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleGroupReduceSorted() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo3> data = env.fromElements(new Pojo3()) - .rebalance().setParallelism(4); - - data.groupBy("a").withPartitioner(new TestPartitionerInt()) - .sortGroup("b", Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Pojo3>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleGroupReduceSorted2() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo4> data = env.fromElements(new Pojo4()) - .rebalance().setParallelism(4); - - data.groupBy("a").withPartitioner(new TestPartitionerInt()) - .sortGroup("b", Order.ASCENDING) - .sortGroup("c", Order.DESCENDING) - .reduceGroup(new IdentityGroupReducer<Pojo4>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleInvalidType() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> data = env.fromElements(new Pojo2()) - .rebalance().setParallelism(4); - - try { - data.groupBy("a").withPartitioner(new TestPartitionerLong()); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) {} - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleInvalidTypeSorted() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo3> data = env.fromElements(new Pojo3()) - .rebalance().setParallelism(4); - - try { - data.groupBy("a") - .sortGroup("b", Order.ASCENDING) - .withPartitioner(new TestPartitionerLong()); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) {} - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleRejectCompositeKey() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> data = env.fromElements(new Pojo2()) - .rebalance().setParallelism(4); - - try { - data.groupBy("a", "b") - .withPartitioner(new TestPartitionerInt()); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) {} - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - public static class Pojo2 { - public int a; - public int b; - - } - - public static class Pojo3 { - public int a; - public int b; - public int c; - } - - public static class Pojo4 { - public int a; - public int b; - public int c; - public int d; - } - - private static class TestPartitionerInt implements Partitioner<Integer> { - @Override - public int partition(Integer key, int numPartitions) { - return 0; - } - } - - private static class TestPartitionerLong implements Partitioner<Long> { - @Override - public int partition(Long key, int numPartitions) { - return 0; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java deleted file mode 100644 index 779b8e5..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.custompartition; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.DummyReducer; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.junit.Test; - -@SuppressWarnings({"serial", "unchecked"}) -public class GroupingTupleTranslationTest extends CompilerTestBase { - - @Test - public void testCustomPartitioningTupleAgg() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0)) - .rebalance().setParallelism(4); - - data.groupBy(0).withPartitioner(new TestPartitionerInt()) - .sum(1) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleReduce() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0)) - .rebalance().setParallelism(4); - - data.groupBy(0).withPartitioner(new TestPartitionerInt()) - .reduce(new DummyReducer<Tuple2<Integer,Integer>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleGroupReduce() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0)) - .rebalance().setParallelism(4); - - data.groupBy(0).withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleGroupReduceSorted() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0)) - .rebalance().setParallelism(4); - - data.groupBy(0).withPartitioner(new TestPartitionerInt()) - .sortGroup(1, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleGroupReduceSorted2() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple4<Integer,Integer,Integer, Integer>> data = env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0)) - .rebalance().setParallelism(4); - - data.groupBy(0).withPartitioner(new TestPartitionerInt()) - .sortGroup(1, Order.ASCENDING) - .sortGroup(2, Order.DESCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); - SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleInvalidType() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0)) - .rebalance().setParallelism(4); - - try { - data.groupBy(0).withPartitioner(new TestPartitionerLong()); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) {} - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleInvalidTypeSorted() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0)) - .rebalance().setParallelism(4); - - try { - data.groupBy(0) - .sortGroup(1, Order.ASCENDING) - .withPartitioner(new TestPartitionerLong()); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) {} - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCustomPartitioningTupleRejectCompositeKey() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0)) - .rebalance().setParallelism(4); - - try { - data.groupBy(0, 1) - .withPartitioner(new TestPartitionerInt()); - fail("Should throw an exception"); - } - catch (InvalidProgramException e) {} - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - private static class TestPartitionerInt implements Partitioner<Integer> { - @Override - public int partition(Integer key, int numPartitions) { - return 0; - } - } - - private static class TestPartitionerLong implements Partitioner<Long> { - @Override - public int partition(Long key, int numPartitions) { - return 0; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java deleted file mode 100644 index eae40cf..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.custompartition; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; -import org.apache.flink.optimizer.testfunctions.IdentityMapper; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.junit.Test; - -@SuppressWarnings({"serial", "unchecked"}) -public class JoinCustomPartitioningTest extends CompilerTestBase { - - @Test - public void testJoinWithTuples() { - try { - final Partitioner<Long> partitioner = new TestPartitionerLong(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); - DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); - - input1 - .join(input2, JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0).withPartitioner(partitioner) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); - assertEquals(partitioner, join.getInput1().getPartitioner()); - assertEquals(partitioner, join.getInput2().getPartitioner()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testJoinWithTuplesWrongType() { - try { - final Partitioner<Integer> partitioner = new TestPartitionerInt(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); - DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); - - try { - input1 - .join(input2, JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0) - .withPartitioner(partitioner); - - fail("should throw an exception"); - } - catch (InvalidProgramException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testJoinWithPojos() { - try { - final Partitioner<Integer> partitioner = new TestPartitionerInt(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> input1 = env.fromElements(new Pojo2()); - DataSet<Pojo3> input2 = env.fromElements(new Pojo3()); - - input1 - .join(input2, JoinHint.REPARTITION_HASH_FIRST) - .where("b").equalTo("a").withPartitioner(partitioner) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); - assertEquals(partitioner, join.getInput1().getPartitioner()); - assertEquals(partitioner, join.getInput2().getPartitioner()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testJoinWithPojosWrongType() { - try { - final Partitioner<Long> partitioner = new TestPartitionerLong(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> input1 = env.fromElements(new Pojo2()); - DataSet<Pojo3> input2 = env.fromElements(new Pojo3()); - - try { - input1 - .join(input2, JoinHint.REPARTITION_HASH_FIRST) - .where("a").equalTo("b") - .withPartitioner(partitioner); - - fail("should throw an exception"); - } - catch (InvalidProgramException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testJoinWithKeySelectors() { - try { - final Partitioner<Integer> partitioner = new TestPartitionerInt(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> input1 = env.fromElements(new Pojo2()); - DataSet<Pojo3> input2 = env.fromElements(new Pojo3()); - - input1 - .join(input2, JoinHint.REPARTITION_HASH_FIRST) - .where(new Pojo2KeySelector()) - .equalTo(new Pojo3KeySelector()) - .withPartitioner(partitioner) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); - assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); - assertEquals(partitioner, join.getInput1().getPartitioner()); - assertEquals(partitioner, join.getInput2().getPartitioner()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testJoinWithKeySelectorsWrongType() { - try { - final Partitioner<Long> partitioner = new TestPartitionerLong(); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Pojo2> input1 = env.fromElements(new Pojo2()); - DataSet<Pojo3> input2 = env.fromElements(new Pojo3()); - - try { - input1 - .join(input2, JoinHint.REPARTITION_HASH_FIRST) - .where(new Pojo2KeySelector()) - .equalTo(new Pojo3KeySelector()) - .withPartitioner(partitioner); - - fail("should throw an exception"); - } - catch (InvalidProgramException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testIncompatibleHashAndCustomPartitioning() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); - - DataSet<Tuple3<Long, Long, Long>> partitioned = input - .partitionCustom(new Partitioner<Long>() { - @Override - public int partition(Long key, int numPartitions) { return 0; } - }, 0) - .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2"); - - - DataSet<Tuple3<Long, Long, Long>> grouped = partitioned - .distinct(0, 1) - .groupBy(1) - .sortGroup(0, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1"); - - grouped - .join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>()) - .print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource(); - - assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy()); - assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || - coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - private static class TestPartitionerInt implements Partitioner<Integer> { - @Override - public int partition(Integer key, int numPartitions) { - return 0; - } - } - - private static class TestPartitionerLong implements Partitioner<Long> { - @Override - public int partition(Long key, int numPartitions) { - return 0; - } - } - - public static class Pojo2 { - public int a; - public int b; - } - - public static class Pojo3 { - public int a; - public int b; - public int c; - } - - private static class Pojo2KeySelector implements KeySelector<Pojo2, Integer> { - @Override - public Integer getKey(Pojo2 value) { - return value.a; - } - } - - private static class Pojo3KeySelector implements KeySelector<Pojo3, Integer> { - @Override - public Integer getKey(Pojo3 value) { - return value.b; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java deleted file mode 100644 index cb4bd78..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.dataexchange; - -import org.apache.flink.api.common.ExecutionMode; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; -import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; -import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper; -import org.apache.flink.optimizer.testfunctions.SelectOneReducer; -import org.apache.flink.optimizer.testfunctions.Top1GroupReducer; -import org.apache.flink.runtime.io.network.DataExchangeMode; -import org.junit.Test; - -import java.util.Collection; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * This test checks the correct assignment of the DataExchangeMode to - * connections for programs that branch, and re-join those branches. - * - * <pre> - * /-> (sink) - * / - * /-> (reduce) -+ /-> (flatmap) -> (sink) - * / \ / - * (source) -> (map) - (join) -+-----\ - * \ / \ - * \-> (filter) -+ \ - * \ (co group) -> (sink) - * \ / - * \-> (reduce) - / - * </pre> - */ -@SuppressWarnings("serial") -public class DataExchangeModeClosedBranchingTest extends CompilerTestBase { - - @Test - public void testPipelinedForced() { - // PIPELINED_FORCED should result in pipelining all the way - verifyBranchingJoiningPlan(ExecutionMode.PIPELINED_FORCED, - DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, - DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, - DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, - DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, - DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, - DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, - DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED); - } - - @Test - public void testPipelined() { - // PIPELINED should result in pipelining all the way - verifyBranchingJoiningPlan(ExecutionMode.PIPELINED, - DataExchangeMode.PIPELINED, // to map - DataExchangeMode.PIPELINED, // to combiner connections are pipelined - DataExchangeMode.BATCH, // to reduce - DataExchangeMode.BATCH, // to filter - DataExchangeMode.PIPELINED, // to sink after reduce - DataExchangeMode.PIPELINED, // to join (first input) - DataExchangeMode.BATCH, // to join (second input) - DataExchangeMode.PIPELINED, // combiner connections are pipelined - DataExchangeMode.BATCH, // to other reducer - DataExchangeMode.PIPELINED, // to flatMap - DataExchangeMode.PIPELINED, // to sink after flatMap - DataExchangeMode.PIPELINED, // to coGroup (first input) - DataExchangeMode.PIPELINED, // to coGroup (second input) - DataExchangeMode.PIPELINED // to sink after coGroup - ); - } - - @Test - public void testBatch() { - // BATCH should result in batching the shuffle all the way - verifyBranchingJoiningPlan(ExecutionMode.BATCH, - DataExchangeMode.PIPELINED, // to map - DataExchangeMode.PIPELINED, // to combiner connections are pipelined - DataExchangeMode.BATCH, // to reduce - DataExchangeMode.BATCH, // to filter - DataExchangeMode.PIPELINED, // to sink after reduce - DataExchangeMode.BATCH, // to join (first input) - DataExchangeMode.BATCH, // to join (second input) - DataExchangeMode.PIPELINED, // combiner connections are pipelined - DataExchangeMode.BATCH, // to other reducer - DataExchangeMode.PIPELINED, // to flatMap - DataExchangeMode.PIPELINED, // to sink after flatMap - DataExchangeMode.BATCH, // to coGroup (first input) - DataExchangeMode.BATCH, // to coGroup (second input) - DataExchangeMode.PIPELINED // to sink after coGroup - ); - } - - @Test - public void testBatchForced() { - // BATCH_FORCED should result in batching all the way - verifyBranchingJoiningPlan(ExecutionMode.BATCH_FORCED, - DataExchangeMode.BATCH, // to map - DataExchangeMode.PIPELINED, // to combiner connections are pipelined - DataExchangeMode.BATCH, // to reduce - DataExchangeMode.BATCH, // to filter - DataExchangeMode.BATCH, // to sink after reduce - DataExchangeMode.BATCH, // to join (first input) - DataExchangeMode.BATCH, // to join (second input) - DataExchangeMode.PIPELINED, // combiner connections are pipelined - DataExchangeMode.BATCH, // to other reducer - DataExchangeMode.BATCH, // to flatMap - DataExchangeMode.BATCH, // to sink after flatMap - DataExchangeMode.BATCH, // to coGroup (first input) - DataExchangeMode.BATCH, // to coGroup (second input) - DataExchangeMode.BATCH // to sink after coGroup - ); - } - - private void verifyBranchingJoiningPlan(ExecutionMode execMode, - DataExchangeMode toMap, - DataExchangeMode toReduceCombiner, - DataExchangeMode toReduce, - DataExchangeMode toFilter, - DataExchangeMode toReduceSink, - DataExchangeMode toJoin1, - DataExchangeMode toJoin2, - DataExchangeMode toOtherReduceCombiner, - DataExchangeMode toOtherReduce, - DataExchangeMode toFlatMap, - DataExchangeMode toFlatMapSink, - DataExchangeMode toCoGroup1, - DataExchangeMode toCoGroup2, - DataExchangeMode toCoGroupSink) - { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().setExecutionMode(execMode); - - DataSet<Tuple2<Long, Long>> data = env.fromElements(33L, 44L) - .map(new MapFunction<Long, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> map(Long value) { - return new Tuple2<Long, Long>(value, value); - } - }); - - DataSet<Tuple2<Long, Long>> reduced = data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>()); - reduced.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("reduceSink"); - - DataSet<Tuple2<Long, Long>> filtered = data.filter(new FilterFunction<Tuple2<Long, Long>>() { - @Override - public boolean filter(Tuple2<Long, Long> value) throws Exception { - return false; - } - }); - - DataSet<Tuple2<Long, Long>> joined = reduced.join(filtered) - .where(1).equalTo(1) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - joined.flatMap(new IdentityFlatMapper<Tuple2<Long, Long>>()) - .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("flatMapSink"); - - joined.coGroup(filtered.groupBy(1).reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>())) - .where(0).equalTo(0) - .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>()) - .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()).name("cgSink"); - - - OptimizedPlan optPlan = compileNoStats(env.createProgramPlan()); - - SinkPlanNode reduceSink = findSink(optPlan.getDataSinks(), "reduceSink"); - SinkPlanNode flatMapSink = findSink(optPlan.getDataSinks(), "flatMapSink"); - SinkPlanNode cgSink = findSink(optPlan.getDataSinks(), "cgSink"); - - DualInputPlanNode coGroupNode = (DualInputPlanNode) cgSink.getPredecessor(); - - DualInputPlanNode joinNode = (DualInputPlanNode) coGroupNode.getInput1().getSource(); - SingleInputPlanNode otherReduceNode = (SingleInputPlanNode) coGroupNode.getInput2().getSource(); - SingleInputPlanNode otherReduceCombinerNode = (SingleInputPlanNode) otherReduceNode.getPredecessor(); - - SingleInputPlanNode reduceNode = (SingleInputPlanNode) joinNode.getInput1().getSource(); - SingleInputPlanNode reduceCombinerNode = (SingleInputPlanNode) reduceNode.getPredecessor(); - assertEquals(reduceNode, reduceSink.getPredecessor()); - - SingleInputPlanNode filterNode = (SingleInputPlanNode) joinNode.getInput2().getSource(); - assertEquals(filterNode, otherReduceCombinerNode.getPredecessor()); - - SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor(); - assertEquals(mapNode, reduceCombinerNode.getPredecessor()); - - SingleInputPlanNode flatMapNode = (SingleInputPlanNode) flatMapSink.getPredecessor(); - assertEquals(joinNode, flatMapNode.getPredecessor()); - - // verify the data exchange modes - - assertEquals(toReduceSink, reduceSink.getInput().getDataExchangeMode()); - assertEquals(toFlatMapSink, flatMapSink.getInput().getDataExchangeMode()); - assertEquals(toCoGroupSink, cgSink.getInput().getDataExchangeMode()); - - assertEquals(toCoGroup1, coGroupNode.getInput1().getDataExchangeMode()); - assertEquals(toCoGroup2, coGroupNode.getInput2().getDataExchangeMode()); - - assertEquals(toJoin1, joinNode.getInput1().getDataExchangeMode()); - assertEquals(toJoin2, joinNode.getInput2().getDataExchangeMode()); - - assertEquals(toOtherReduce, otherReduceNode.getInput().getDataExchangeMode()); - assertEquals(toOtherReduceCombiner, otherReduceCombinerNode.getInput().getDataExchangeMode()); - - assertEquals(toFlatMap, flatMapNode.getInput().getDataExchangeMode()); - - assertEquals(toFilter, filterNode.getInput().getDataExchangeMode()); - assertEquals(toReduce, reduceNode.getInput().getDataExchangeMode()); - assertEquals(toReduceCombiner, reduceCombinerNode.getInput().getDataExchangeMode()); - - assertEquals(toMap, mapNode.getInput().getDataExchangeMode()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) { - for (SinkPlanNode node : collection) { - String nodeName = node.getOptimizerNode().getOperator().getName(); - if (nodeName != null && nodeName.equals(name)) { - return node; - } - } - - throw new IllegalArgumentException("No node with that name was found."); - } -}