http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java new file mode 100644 index 0000000..e65758f --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; +import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class NestedIterationsTest extends CompilerTestBase { + + @Test + public void testRejectNestedBulkIterations() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Long> data = env.generateSequence(1, 100); + + IterativeDataSet<Long> outerIteration = data.iterate(100); + + IterativeDataSet<Long> innerIteration = outerIteration.map(new IdentityMapper<Long>()).iterate(100); + + DataSet<Long> innerResult = innerIteration.closeWith(innerIteration.map(new IdentityMapper<Long>())); + + DataSet<Long> outerResult = outerIteration.closeWith(innerResult.map(new IdentityMapper<Long>())); + + outerResult.print(); + + Plan p = env.createProgramPlan(); + + try { + compileNoStats(p); + } + catch (CompilerException e) { + assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testRejectNestedWorksetIterations() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> outerIteration = data.iterateDelta(data, 100, 0); + + DataSet<Tuple2<Long, Long>> inOuter = outerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>()); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> innerIteration = inOuter.iterateDelta(inOuter, 100, 0); + + DataSet<Tuple2<Long, Long>> inInner = innerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> innerResult = innerIteration.closeWith(inInner, inInner).map(new IdentityMapper<Tuple2<Long,Long>>()); + + DataSet<Tuple2<Long, Long>> outerResult = outerIteration.closeWith(innerResult, innerResult); + + outerResult.print(); + + Plan p = env.createProgramPlan(); + + try { + compileNoStats(p); + } + catch (CompilerException e) { + assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testBulkIterationInClosure() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Long> data1 = env.generateSequence(1, 100); + DataSet<Long> data2 = env.generateSequence(1, 100); + + IterativeDataSet<Long> firstIteration = data1.iterate(100); + + DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>())); + + + IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100); + + DataSet<Long> joined = mainIteration.join(firstResult) + .where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>()) + .with(new DummyFlatJoinFunction<Long>()); + + DataSet<Long> mainResult = mainIteration.closeWith(joined); + + mainResult.print(); + + Plan p = env.createProgramPlan(); + + // optimizer should be able to translate this + OptimizedPlan op = compileNoStats(p); + + // job graph generator should be able to translate this + new JobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testDeltaIterationInClosure() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> data1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); + DataSet<Tuple2<Long, Long>> data2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> firstIteration = data1.iterateDelta(data1, 100, 0); + + DataSet<Tuple2<Long, Long>> inFirst = firstIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> firstResult = firstIteration.closeWith(inFirst, inFirst).map(new IdentityMapper<Tuple2<Long,Long>>()); + + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> mainIteration = data2.iterateDelta(data2, 100, 0); + + DataSet<Tuple2<Long, Long>> joined = mainIteration.getWorkset().join(firstResult).where(0).equalTo(0) + .projectFirst(0).projectSecond(0); + + DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined); + + mainResult.print(); + + Plan p = env.createProgramPlan(); + + // optimizer should be able to translate this + OptimizedPlan op = compileNoStats(p); + + // job graph generator should be able to translate this + new JobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java new file mode 100644 index 0000000..2b42f85 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings("serial") +public class PartitionPushdownTest extends CompilerTestBase { + + @Test + public void testPartitioningNotPushedDown() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); + + input + .groupBy(0, 1).sum(2) + .groupBy(0).sum(1) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + + SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode agg2Combiner = (SingleInputPlanNode) agg2Reducer.getInput().getSource(); + SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Combiner.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_HASH, agg2Reducer.getInput().getShipStrategy()); + assertEquals(new FieldList(0), agg2Reducer.getInput().getShipStrategyKeys()); + + assertEquals(ShipStrategyType.FORWARD, agg2Combiner.getInput().getShipStrategy()); + + assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy()); + assertEquals(new FieldList(0, 1), agg1Reducer.getInput().getShipStrategyKeys()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPartitioningReused() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L)); + + input + .groupBy(0).sum(1) + .groupBy(0, 1).sum(2) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + + SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, agg2Reducer.getInput().getShipStrategy()); + + assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy()); + assertEquals(new FieldList(0), agg1Reducer.getInput().getShipStrategyKeys()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java new file mode 100644 index 0000000..16684dc --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java @@ -0,0 +1,845 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.PartitioningProperty; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@SuppressWarnings("serial") +public class PartitioningReusageTest extends CompilerTestBase { + + @Test + public void noPreviousPartitioningJoin1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(0,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + + } + + @Test + public void noPreviousPartitioningJoin2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + + } + + @Test + public void reuseSinglePartitioningJoin1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(0,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + + } + + @Test + public void reuseSinglePartitioningJoin2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseSinglePartitioningJoin3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .join(set2.partitionByHash(2, 1) + .map(new MockMapper()) + .withForwardedFields("2;1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseSinglePartitioningJoin4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseSinglePartitioningJoin5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .join(set2.partitionByHash(2) + .map(new MockMapper()) + .withForwardedFields("2"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2.partitionByHash(0,1) + .map(new MockMapper()) + .withForwardedFields("0;1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(0,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + + @Test + public void reuseBothPartitioningJoin2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2.partitionByHash(1,2) + .map(new MockMapper()) + .withForwardedFields("1;2"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .join(set2.partitionByHash(2,1) + .map(new MockMapper()) + .withForwardedFields("2;1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0,2) + .map(new MockMapper()).withForwardedFields("0;2") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin6() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(1,2).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin7() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(1,2).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + + @Test + public void noPreviousPartitioningCoGroup1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .coGroup(set2) + .where(0,1).equalTo(0,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + + } + + @Test + public void noPreviousPartitioningCoGroup2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .coGroup(set2) + .where(0,1).equalTo(2,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + + } + + @Test + public void reuseSinglePartitioningCoGroup1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2) + .where(0,1).equalTo(0,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + + } + + @Test + public void reuseSinglePartitioningCoGroup2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2) + .where(0,1).equalTo(2,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseSinglePartitioningCoGroup3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .coGroup(set2.partitionByHash(2, 1) + .map(new MockMapper()) + .withForwardedFields("2;1")) + .where(0,1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseSinglePartitioningCoGroup4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .coGroup(set2) + .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseSinglePartitioningCoGroup5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .coGroup(set2.partitionByHash(2) + .map(new MockMapper()) + .withForwardedFields("2")) + .where(0,1).equalTo(2,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2.partitionByHash(0, 1) + .map(new MockMapper()) + .withForwardedFields("0;1")) + .where(0, 1).equalTo(0, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + + @Test + public void reuseBothPartitioningCoGroup2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2.partitionByHash(1, 2) + .map(new MockMapper()) + .withForwardedFields("1;2")) + .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .coGroup(set2.partitionByHash(2, 1) + .map(new MockMapper()) + .withForwardedFields("2;1")) + .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0,2) + .map(new MockMapper()).withForwardedFields("0;2") + .coGroup(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1")) + .where(0, 2).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .coGroup(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1")) + .where(0, 2).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup6() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .coGroup(set2.partitionByHash(2) + .map(new MockMapper()) + .withForwardedFields("2")) + .where(0, 2).equalTo(1, 2).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup7() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .coGroup(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1")) + .where(0, 2).equalTo(1, 2).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + + + private void checkValidJoinInputProperties(DualInputPlanNode join) { + + GlobalProperties inProps1 = join.getInput1().getGlobalProperties(); + GlobalProperties inProps2 = join.getInput2().getGlobalProperties(); + + if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && + inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { + + // check that both inputs are hash partitioned on the same fields + FieldList pFields1 = inProps1.getPartitioningFields(); + FieldList pFields2 = inProps2.getPartitioningFields(); + + assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2, + pFields1.size() == pFields2.size()); + + FieldList reqPFields1 = join.getKeysForInput1(); + FieldList reqPFields2 = join.getKeysForInput2(); + + for(int i=0; i<pFields1.size(); i++) { + + // get fields + int f1 = pFields1.get(i); + int f2 = pFields2.get(i); + + // check that field positions in original key field list are identical + int pos1 = getPosInFieldList(f1, reqPFields1); + int pos2 = getPosInFieldList(f2, reqPFields2); + + if(pos1 < 0) { + fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1); + } + if(pos2 < 0) { + fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2); + } + if(pos1 != pos2) { + fail("Inputs are not partitioned on the same key fields"); + } + } + + } + else if(inProps1.getPartitioning() == PartitioningProperty.FULL_REPLICATION && + inProps2.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { + // we are good. No need to check for fields + } + else if(inProps1.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED && + inProps2.getPartitioning() == PartitioningProperty.FULL_REPLICATION) { + // we are good. No need to check for fields + } + else { + throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroupinputs"); + } + + } + + private void checkValidCoGroupInputProperties(DualInputPlanNode coGroup) { + + GlobalProperties inProps1 = coGroup.getInput1().getGlobalProperties(); + GlobalProperties inProps2 = coGroup.getInput2().getGlobalProperties(); + + if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && + inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { + + // check that both inputs are hash partitioned on the same fields + FieldList pFields1 = inProps1.getPartitioningFields(); + FieldList pFields2 = inProps2.getPartitioningFields(); + + assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2, + pFields1.size() == pFields2.size()); + + FieldList reqPFields1 = coGroup.getKeysForInput1(); + FieldList reqPFields2 = coGroup.getKeysForInput2(); + + for(int i=0; i<pFields1.size(); i++) { + + // get fields + int f1 = pFields1.get(i); + int f2 = pFields2.get(i); + + // check that field positions in original key field list are identical + int pos1 = getPosInFieldList(f1, reqPFields1); + int pos2 = getPosInFieldList(f2, reqPFields2); + + if(pos1 < 0) { + fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1); + } + if(pos2 < 0) { + fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2); + } + if(pos1 != pos2) { + fail("Inputs are not partitioned on the same key fields"); + } + } + + } + else { + throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroup inputs"); + } + + } + + private int getPosInFieldList(int field, FieldList list) { + + int pos; + for(pos=0; pos<list.size(); pos++) { + if(field == list.get(pos)) { + break; + } + } + if(pos == list.size()) { + return -1; + } else { + return pos; + } + + } + + + + public static class MockMapper implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> { + @Override + public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception { + return null; + } + } + + public static class MockJoin implements JoinFunction<Tuple3<Integer, Integer, Integer>, + Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> { + + @Override + public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception { + return null; + } + } + + public static class MockCoGroup implements CoGroupFunction<Tuple3<Integer, Integer, Integer>, + Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> { + + @Override + public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> first, Iterable<Tuple3<Integer, Integer, Integer>> second, + Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception { + + } + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java new file mode 100644 index 0000000..86f01b0 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer; + +import static org.junit.Assert.*; + +import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.apache.flink.optimizer.testfunctions.SelectOneReducer; +import org.junit.Test; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.optimizer.plan.BulkIterationPlanNode; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.configuration.Configuration; + +@SuppressWarnings("serial") +public class PipelineBreakerTest extends CompilerTestBase { + + @Test + public void testPipelineBreakerWithBroadcastVariable() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(64); + + DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>()); + + DataSet<Long> result = source.map(new IdentityMapper<Long>()) + .map(new IdentityMapper<Long>()) + .withBroadcastSet(source, "bc"); + + result.print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource(); + + assertTrue(mapper.getInput().getTempMode().breaksPipeline()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPipelineBreakerBroadcastedAllReduce() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(64); + + DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>()); + + DataSet<Long> bcInput1 = sourceWithMapper + .map(new IdentityMapper<Long>()) + .reduce(new SelectOneReducer<Long>()); + DataSet<Long> bcInput2 = env.generateSequence(1, 10); + + DataSet<Long> result = sourceWithMapper + .map(new IdentityMapper<Long>()) + .withBroadcastSet(bcInput1, "bc1") + .withBroadcastSet(bcInput2, "bc2"); + + result.print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource(); + + assertTrue(mapper.getInput().getTempMode().breaksPipeline()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPipelineBreakerBroadcastedPartialSolution() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(64); + + + DataSet<Long> initialSource = env.generateSequence(1, 10); + IterativeDataSet<Long> iteration = initialSource.iterate(100); + + + DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>()); + + DataSet<Long> bcInput1 = sourceWithMapper + .map(new IdentityMapper<Long>()) + .reduce(new SelectOneReducer<Long>()); + + DataSet<Long> result = sourceWithMapper + .map(new IdentityMapper<Long>()) + .withBroadcastSet(iteration, "bc2") + .withBroadcastSet(bcInput1, "bc1"); + + + iteration.closeWith(result).print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + BulkIterationPlanNode iterationPlanNode = (BulkIterationPlanNode) sink.getInput().getSource(); + SingleInputPlanNode mapper = (SingleInputPlanNode) iterationPlanNode.getRootOfStepFunction(); + + assertTrue(mapper.getInput().getTempMode().breaksPipeline()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPilelineBreakerWithCross() { + try { + { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(64); + + DataSet<Long> initialSource = env.generateSequence(1, 10); + + Configuration conf= new Configuration(); + conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST); + initialSource + .map(new IdentityMapper<Long>()) + .cross(initialSource).withParameters(conf) + .print(); + + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource(); + + assertTrue(mapper.getInput1().getTempMode().breaksPipeline()); + } + + { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(64); + + DataSet<Long> initialSource = env.generateSequence(1, 10); + + Configuration conf= new Configuration(); + conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND); + initialSource + .map(new IdentityMapper<Long>()) + .cross(initialSource).withParameters(conf) + .print(); + + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource(); + + assertTrue(mapper.getInput2().getTempMode().breaksPipeline()); + } + + { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(64); + + DataSet<Long> initialSource = env.generateSequence(1, 10); + + Configuration conf= new Configuration(); + conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST); + initialSource + .map(new IdentityMapper<Long>()) + .cross(initialSource).withParameters(conf) + .print(); + + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource(); + + assertTrue(mapper.getInput1().getTempMode().breaksPipeline()); + } + + { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(64); + + DataSet<Long> initialSource = env.generateSequence(1, 10); + + Configuration conf= new Configuration(); + conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND); + initialSource + .map(new IdentityMapper<Long>()) + .cross(initialSource).withParameters(conf) + .print(); + + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource(); + + assertTrue(mapper.getInput2().getTempMode().breaksPipeline()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java new file mode 100644 index 0000000..7be2b16 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java @@ -0,0 +1,897 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.PartitioningProperty; +import org.apache.flink.optimizer.plan.NAryUnionPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.plan.SourcePlanNode; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +@SuppressWarnings({"serial"}) +public class PropertyDataSourceTest extends CompilerTestBase { + + private List<Tuple3<Long, SomePojo, String>> tuple3PojoData = new ArrayList<Tuple3<Long, SomePojo, String>>(); + private TupleTypeInfo<Tuple3<Long, SomePojo, String>> tuple3PojoType = new TupleTypeInfo<Tuple3<Long, SomePojo, String>>( + BasicTypeInfo.LONG_TYPE_INFO, + TypeExtractor.createTypeInfo(SomePojo.class), + BasicTypeInfo.STRING_TYPE_INFO + ); + + @Test + public void checkSinglePartitionedSource1() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource2() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(1, 0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource3() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("*"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1, 2, 3, 4))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource4() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1, 2, 3))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource5() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.stringField"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(3))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource6() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.intField; f2"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2, 4))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource7() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy("byDate", 1, 0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING); + Assert.assertTrue(gprops.getCustomPartitioner() != null); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedGroupedSource1() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0) + .splitsGroupedBy(0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedGroupedSource2() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0) + .splitsGroupedBy(1, 0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0, 1))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedGroupedSource3() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(1) + .splitsGroupedBy(0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedGroupedSource4() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0, 1) + .splitsGroupedBy(0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedGroupedSource5() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f2") + .splitsGroupedBy("f2"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(4))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(4))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedGroupedSource6() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.intField") + .splitsGroupedBy("f0; f1.intField"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedGroupedSource7() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.intField") + .splitsGroupedBy("f1"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedGroupedSource8() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1") + .splitsGroupedBy("f1.stringField"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedOrderedSource1() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(1) + .splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedOrderedSource2() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(1) + .splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1, 0))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedOrderedSource3() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0) + .splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedOrderedSource4() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0, 1) + .splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedOrderedSource5() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.intField") + .splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedOrderedSource6() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.intField") + .splitsOrderedBy("f1", new Order[]{Order.DESCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedOrderedSource7() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1") + .splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkCoPartitionedSources1() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data1 = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data1.getSplitDataProperties() + .splitsPartitionedBy("byDate", 0); + + DataSource<Tuple2<Long, String>> data2 = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data2.getSplitDataProperties() + .splitsPartitionedBy("byDate", 0); + + data1.union(data2).print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource(); + SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource(); + + GlobalProperties gprops1 = sourceNode1.getGlobalProperties(); + LocalProperties lprops1 = sourceNode1.getLocalProperties(); + GlobalProperties gprops2 = sourceNode2.getGlobalProperties(); + LocalProperties lprops2 = sourceNode2.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING); + Assert.assertTrue(lprops1.getGroupedFields() == null); + Assert.assertTrue(lprops1.getOrdering() == null); + + Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING); + Assert.assertTrue(lprops2.getGroupedFields() == null); + Assert.assertTrue(lprops2.getOrdering() == null); + + Assert.assertTrue(gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner())); + } + + @Test + public void checkCoPartitionedSources2() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data1 = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data1.getSplitDataProperties() + .splitsPartitionedBy("byCountry", 0); + + DataSource<Tuple2<Long, String>> data2 = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data2.getSplitDataProperties() + .splitsPartitionedBy("byDate", 0); + + data1.union(data2).print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource(); + SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource(); + + GlobalProperties gprops1 = sourceNode1.getGlobalProperties(); + LocalProperties lprops1 = sourceNode1.getLocalProperties(); + GlobalProperties gprops2 = sourceNode2.getGlobalProperties(); + LocalProperties lprops2 = sourceNode2.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING); + Assert.assertTrue(lprops1.getGroupedFields() == null); + Assert.assertTrue(lprops1.getOrdering() == null); + + Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING); + Assert.assertTrue(lprops2.getGroupedFields() == null); + Assert.assertTrue(lprops2.getOrdering() == null); + + Assert.assertTrue(!gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner())); + } + + + public static class SomePojo { + public double doubleField; + public int intField; + public String stringField; + } + +} + +