Repository: flink
Updated Branches:
  refs/heads/master 1b6903216 -> 633b0d6a9


http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
new file mode 100644
index 0000000..8720aa7
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+/**
+* Tests that validate optimizer choices when using operators that are 
requesting certain specific execution
+* strategies.
+*/
+@SuppressWarnings("serial")
+public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
+       
+       private static final String JOIN_WITH_INVARIANT_NAME = "Test Join 
Invariant";
+       private static final String JOIN_WITH_SOLUTION_SET = "Test Join 
SolutionSet";
+       private static final String NEXT_WORKSET_REDUCER_NAME = "Test Reduce 
Workset";
+       private static final String SOLUTION_DELTA_MAPPER_NAME = "Test Map 
Delta";
+
+       @Test
+       public void testJavaApiWithDeferredSoltionSetUpdateWithMapper() {
+               try {
+                       Plan plan = getJavaTestPlan(false, true);
+                       
+                       OptimizedPlan oPlan = compileNoStats(plan);
+       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
+                       DualInputPlanNode joinWithInvariantNode = 
resolver.getNode(JOIN_WITH_INVARIANT_NAME);
+                       DualInputPlanNode joinWithSolutionSetNode = 
resolver.getNode(JOIN_WITH_SOLUTION_SET);
+                       SingleInputPlanNode worksetReducer = 
resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
+                       SingleInputPlanNode deltaMapper = 
resolver.getNode(SOLUTION_DELTA_MAPPER_NAME);
+                       
+                       // iteration preserves partitioning in reducer, so the 
first partitioning is out of the loop, 
+                       // the in-loop partitioning is before the final reducer
+                       
+                       // verify joinWithInvariant
+                       assertEquals(ShipStrategyType.FORWARD, 
joinWithInvariantNode.getInput1().getShipStrategy()); 
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithInvariantNode.getInput2().getShipStrategy());
+                       assertEquals(new FieldList(1, 2), 
joinWithInvariantNode.getKeysForInput1());
+                       assertEquals(new FieldList(1, 2), 
joinWithInvariantNode.getKeysForInput2());
+                       
+                       // verify joinWithSolutionSet
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getInput1().getShipStrategy());
+                       assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getInput2().getShipStrategy());
+                       assertEquals(new FieldList(1, 0), 
joinWithSolutionSetNode.getKeysForInput1());
+                       
+                       
+                       // verify reducer
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
worksetReducer.getInput().getShipStrategy());
+                       assertEquals(new FieldList(1, 2), 
worksetReducer.getKeys(0));
+                       
+                       // currently, the system may partition before or after 
the mapper
+                       ShipStrategyType ss1 = 
deltaMapper.getInput().getShipStrategy();
+                       ShipStrategyType ss2 = 
deltaMapper.getOutgoingChannels().get(0).getShipStrategy();
+                       
+                       assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == 
ShipStrategyType.PARTITION_HASH) ||
+                                               (ss2 == 
ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) );
+               
+                       new JobGraphGenerator().compileJobGraph(oPlan);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test errored: " + e.getMessage());
+               }
+       }
+       
+       @Test
+       public void 
testJavaApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
+               try {
+                       Plan plan = getJavaTestPlan(false, false);
+                       
+                       OptimizedPlan oPlan = compileNoStats(plan);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
+                       DualInputPlanNode joinWithInvariantNode = 
resolver.getNode(JOIN_WITH_INVARIANT_NAME);
+                       DualInputPlanNode joinWithSolutionSetNode = 
resolver.getNode(JOIN_WITH_SOLUTION_SET);
+                       SingleInputPlanNode worksetReducer = 
resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
+                       
+                       // iteration preserves partitioning in reducer, so the 
first partitioning is out of the loop, 
+                       // the in-loop partitioning is before the final reducer
+                       
+                       // verify joinWithInvariant
+                       assertEquals(ShipStrategyType.FORWARD, 
joinWithInvariantNode.getInput1().getShipStrategy()); 
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithInvariantNode.getInput2().getShipStrategy());
+                       assertEquals(new FieldList(1, 2), 
joinWithInvariantNode.getKeysForInput1());
+                       assertEquals(new FieldList(1, 2), 
joinWithInvariantNode.getKeysForInput2());
+                       
+                       // verify joinWithSolutionSet
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getInput1().getShipStrategy());
+                       assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getInput2().getShipStrategy());
+                       assertEquals(new FieldList(1, 0), 
joinWithSolutionSetNode.getKeysForInput1());
+                       
+                       // verify reducer
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
worksetReducer.getInput().getShipStrategy());
+                       assertEquals(new FieldList(1, 2), 
worksetReducer.getKeys(0));
+                       
+                       // verify solution delta
+                       assertEquals(2, 
joinWithSolutionSetNode.getOutgoingChannels().size());
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
+                       
+                       new JobGraphGenerator().compileJobGraph(oPlan);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test errored: " + e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testJavaApiWithDirectSoltionSetUpdate() {
+               try {
+                       Plan plan = getJavaTestPlan(true, false);
+                       
+                       OptimizedPlan oPlan = compileNoStats(plan);
+       
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
+                       DualInputPlanNode joinWithInvariantNode = 
resolver.getNode(JOIN_WITH_INVARIANT_NAME);
+                       DualInputPlanNode joinWithSolutionSetNode = 
resolver.getNode(JOIN_WITH_SOLUTION_SET);
+                       SingleInputPlanNode worksetReducer = 
resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
+                       
+                       // iteration preserves partitioning in reducer, so the 
first partitioning is out of the loop, 
+                       // the in-loop partitioning is before the final reducer
+                       
+                       // verify joinWithInvariant
+                       assertEquals(ShipStrategyType.FORWARD, 
joinWithInvariantNode.getInput1().getShipStrategy()); 
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithInvariantNode.getInput2().getShipStrategy());
+                       assertEquals(new FieldList(1, 2), 
joinWithInvariantNode.getKeysForInput1());
+                       assertEquals(new FieldList(1, 2), 
joinWithInvariantNode.getKeysForInput2());
+                       
+                       // verify joinWithSolutionSet
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getInput1().getShipStrategy());
+                       assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getInput2().getShipStrategy());
+                       assertEquals(new FieldList(1, 0), 
joinWithSolutionSetNode.getKeysForInput1());
+                       
+                       // verify reducer
+                       assertEquals(ShipStrategyType.FORWARD, 
worksetReducer.getInput().getShipStrategy());
+                       assertEquals(new FieldList(1, 2), 
worksetReducer.getKeys(0));
+                       
+                       
+                       // verify solution delta
+                       assertEquals(1, 
joinWithSolutionSetNode.getOutgoingChannels().size());
+                       assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
+                       
+                       new JobGraphGenerator().compileJobGraph(oPlan);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test errored: " + e.getMessage());
+               }
+       }
+       
+       
+       @Test
+       public void testRejectPlanIfSolutionSetKeysAndJoinKeysDontMatch() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+                       
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple3<Long, Long, Long>> solutionSetInput = 
env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple3<Long, Long, Long>> worksetInput = 
env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset");
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple3<Long, Long, Long>> invariantInput = 
env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant 
Input");
+                       
+                       DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, 
Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2);
+                       
+                       
+                       DataSet<Tuple3<Long, Long, Long>> result = 
+                       
+                       iter.getWorkset().join(invariantInput)
+                               .where(1, 2)
+                               .equalTo(1, 2)
+                               .with(new JoinFunction<Tuple3<Long,Long,Long>, 
Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
+                                       public Tuple3<Long, Long, Long> 
join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
+                                               return first;
+                                       }
+                               });
+                       
+                       try {
+                       result.join(iter.getSolutionSet())
+                               .where(1, 0)
+                               .equalTo(0, 2)
+                               .with(new JoinFunction<Tuple3<Long, Long, 
Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+                                       public Tuple3<Long, Long, Long> 
join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
+                                               return second;
+                                       }
+                               });
+                               fail("The join should be rejected with key type 
mismatches.");
+                       }
+                       catch (InvalidProgramException e) {
+                               // expected!
+                       }
+                       
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test errored: " + e.getMessage());
+               }
+       }
+       
+       private Plan getJavaTestPlan(boolean joinPreservesSolutionSet, boolean 
mapBeforeSolutionDelta) {
+               
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+               
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> solutionSetInput = 
env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> worksetInput = 
env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset");
+               @SuppressWarnings("unchecked")
+               DataSet<Tuple3<Long, Long, Long>> invariantInput = 
env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant 
Input");
+               
+               DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, 
Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2);
+               
+               
+               DataSet<Tuple3<Long, Long, Long>> joinedWithSolutionSet = 
+               
+               iter.getWorkset().join(invariantInput)
+                       .where(1, 2)
+                       .equalTo(1, 2)
+                       .with(new RichJoinFunction<Tuple3<Long,Long,Long>, 
Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
+                               public Tuple3<Long, Long, Long> 
join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
+                                       return first;
+                               }
+                       })
+                       .name(JOIN_WITH_INVARIANT_NAME)
+               
+               .join(iter.getSolutionSet())
+                       .where(1, 0)
+                       .equalTo(1, 2)
+                       .with(new RichJoinFunction<Tuple3<Long, Long, Long>, 
Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+                               public Tuple3<Long, Long, Long> 
join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
+                                       return second;
+                               }
+                       })
+                       .name(JOIN_WITH_SOLUTION_SET)
+                       .withForwardedFieldsSecond(joinPreservesSolutionSet ? 
new String[] {"0->0", "1->1", "2->2" } : null);
+                       
+               DataSet<Tuple3<Long, Long, Long>> nextWorkset = 
joinedWithSolutionSet.groupBy(1, 2)
+                       .reduceGroup(new 
RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
+                               public void reduce(Iterable<Tuple3<Long, Long, 
Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
+                       })
+                       .name(NEXT_WORKSET_REDUCER_NAME)
+                       .withForwardedFields("1->1","2->2","0->0");
+               
+               
+               DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = 
mapBeforeSolutionDelta ?
+                               joinedWithSolutionSet.map(new 
RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public 
Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } 
})
+                                       
.name(SOLUTION_DELTA_MAPPER_NAME).withForwardedFields("0->0","1->1","2->2") :
+                               joinedWithSolutionSet;
+               
+               iter.closeWith(nextSolutionSet, nextWorkset)
+                       .print();
+               
+               return env.createProgramPlan();
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
new file mode 100644
index 0000000..23f8897
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CoGroupGlobalPropertiesCompatibilityTest {
+
+       @Test
+       public void checkCompatiblePartitionings() {
+               try {
+                       final FieldList keysLeft = new FieldList(1, 4);
+                       final FieldList keysRight = new FieldList(3, 1);
+                       
+                       CoGroupDescriptor descr = new 
CoGroupDescriptor(keysLeft, keysRight);
+                       
+                       // test compatible hash partitioning
+                       {
+                               RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+                               reqLeft.setHashPartitioned(keysLeft);
+                               RequestedGlobalProperties reqRight = new 
RequestedGlobalProperties();
+                               reqRight.setHashPartitioned(keysRight);
+                               
+                               GlobalProperties propsLeft = new 
GlobalProperties();
+                               propsLeft.setHashPartitioned(keysLeft);
+                               GlobalProperties propsRight = new 
GlobalProperties();
+                               propsRight.setHashPartitioned(keysRight);
+                               
+                               assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+                       }
+                       
+                       // test compatible custom partitioning
+                       {
+                               Partitioner<Object> part = new 
Partitioner<Object>() {
+                                       @Override
+                                       public int partition(Object key, int 
numPartitions) {
+                                               return 0;
+                                       }
+                               };
+                               
+                               RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+                               reqLeft.setCustomPartitioned(keysLeft, part);
+                               RequestedGlobalProperties reqRight = new 
RequestedGlobalProperties();
+                               reqRight.setCustomPartitioned(keysRight, part);
+                               
+                               GlobalProperties propsLeft = new 
GlobalProperties();
+                               propsLeft.setCustomPartitioned(keysLeft, part);
+                               GlobalProperties propsRight = new 
GlobalProperties();
+                               propsRight.setCustomPartitioned(keysRight, 
part);
+                               
+                               assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+                       }
+                       
+                       // test custom partitioning matching any partitioning
+                       {
+                               Partitioner<Object> part = new 
Partitioner<Object>() {
+                                       @Override
+                                       public int partition(Object key, int 
numPartitions) {
+                                               return 0;
+                                       }
+                               };
+                               
+                               RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+                               reqLeft.setAnyPartitioning(keysLeft);
+                               RequestedGlobalProperties reqRight = new 
RequestedGlobalProperties();
+                               reqRight.setAnyPartitioning(keysRight);
+                               
+                               GlobalProperties propsLeft = new 
GlobalProperties();
+                               propsLeft.setCustomPartitioned(keysLeft, part);
+                               GlobalProperties propsRight = new 
GlobalProperties();
+                               propsRight.setCustomPartitioned(keysRight, 
part);
+                               
+                               assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void checkInompatiblePartitionings() {
+               try {
+                       final FieldList keysLeft = new FieldList(1);
+                       final FieldList keysRight = new FieldList(3);
+                       
+                       final Partitioner<Object> part = new 
Partitioner<Object>() {
+                               @Override
+                               public int partition(Object key, int 
numPartitions) {
+                                       return 0;
+                               }
+                       };
+                       final Partitioner<Object> part2 = new 
Partitioner<Object>() {
+                               @Override
+                               public int partition(Object key, int 
numPartitions) {
+                                       return 0;
+                               }
+                       };
+                       
+                       CoGroupDescriptor descr = new 
CoGroupDescriptor(keysLeft, keysRight);
+                       
+                       // test incompatible hash with custom partitioning
+                       {
+                               RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+                               reqLeft.setAnyPartitioning(keysLeft);
+                               RequestedGlobalProperties reqRight = new 
RequestedGlobalProperties();
+                               reqRight.setAnyPartitioning(keysRight);
+                               
+                               GlobalProperties propsLeft = new 
GlobalProperties();
+                               propsLeft.setHashPartitioned(keysLeft);
+                               GlobalProperties propsRight = new 
GlobalProperties();
+                               propsRight.setCustomPartitioned(keysRight, 
part);
+                               
+                               assertFalse(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+                       }
+                       
+                       // test incompatible custom partitionings
+                       {
+                               RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+                               reqLeft.setAnyPartitioning(keysLeft);
+                               RequestedGlobalProperties reqRight = new 
RequestedGlobalProperties();
+                               reqRight.setAnyPartitioning(keysRight);
+                               
+                               GlobalProperties propsLeft = new 
GlobalProperties();
+                               propsLeft.setCustomPartitioned(keysLeft, part);
+                               GlobalProperties propsRight = new 
GlobalProperties();
+                               propsRight.setCustomPartitioned(keysRight, 
part2);
+                               
+                               assertFalse(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
new file mode 100644
index 0000000..e7807c9
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase {
+
+       @Test
+       public void testRejectCoGroupOnHashAndRangePartitioning() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Tuple2<Long, Long>> input = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+                       
+                       Configuration cfg = new Configuration();
+                       cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+                       
cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+                       
+                       input.coGroup(input).where(0).equalTo(0)
+                               .with(new DummyCoGroupFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>>())
+                               .withParameters(cfg)
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       try {
+                               compileNoStats(p);
+                               fail("This should fail with an exception");
+                       }
+                       catch (CompilerException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
new file mode 100644
index 0000000..839f0a1
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class JoinGlobalPropertiesCompatibilityTest {
+
+       @Test
+       public void checkCompatiblePartitionings() {
+               try {
+                       final FieldList keysLeft = new FieldList(1, 4);
+                       final FieldList keysRight = new FieldList(3, 1);
+                       
+                       SortMergeJoinDescriptor descr = new 
SortMergeJoinDescriptor(keysLeft, keysRight);
+                       
+                       // test compatible hash partitioning
+                       {
+                               RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+                               reqLeft.setHashPartitioned(keysLeft);
+                               RequestedGlobalProperties reqRight = new 
RequestedGlobalProperties();
+                               reqRight.setHashPartitioned(keysRight);
+                               
+                               GlobalProperties propsLeft = new 
GlobalProperties();
+                               propsLeft.setHashPartitioned(keysLeft);
+                               GlobalProperties propsRight = new 
GlobalProperties();
+                               propsRight.setHashPartitioned(keysRight);
+                               
+                               assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+                       }
+                       
+                       // test compatible custom partitioning
+                       {
+                               Partitioner<Object> part = new 
Partitioner<Object>() {
+                                       @Override
+                                       public int partition(Object key, int 
numPartitions) {
+                                               return 0;
+                                       }
+                               };
+                               
+                               RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+                               reqLeft.setCustomPartitioned(keysLeft, part);
+                               RequestedGlobalProperties reqRight = new 
RequestedGlobalProperties();
+                               reqRight.setCustomPartitioned(keysRight, part);
+                               
+                               GlobalProperties propsLeft = new 
GlobalProperties();
+                               propsLeft.setCustomPartitioned(keysLeft, part);
+                               GlobalProperties propsRight = new 
GlobalProperties();
+                               propsRight.setCustomPartitioned(keysRight, 
part);
+                               
+                               assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+                       }
+                       
+                       // test custom partitioning matching any partitioning
+                       {
+                               Partitioner<Object> part = new 
Partitioner<Object>() {
+                                       @Override
+                                       public int partition(Object key, int 
numPartitions) {
+                                               return 0;
+                                       }
+                               };
+                               
+                               RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+                               reqLeft.setAnyPartitioning(keysLeft);
+                               RequestedGlobalProperties reqRight = new 
RequestedGlobalProperties();
+                               reqRight.setAnyPartitioning(keysRight);
+                               
+                               GlobalProperties propsLeft = new 
GlobalProperties();
+                               propsLeft.setCustomPartitioned(keysLeft, part);
+                               GlobalProperties propsRight = new 
GlobalProperties();
+                               propsRight.setCustomPartitioned(keysRight, 
part);
+                               
+                               assertTrue(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void checkInompatiblePartitionings() {
+               try {
+                       final FieldList keysLeft = new FieldList(1);
+                       final FieldList keysRight = new FieldList(3);
+                       
+                       final Partitioner<Object> part = new 
Partitioner<Object>() {
+                               @Override
+                               public int partition(Object key, int 
numPartitions) {
+                                       return 0;
+                               }
+                       };
+                       final Partitioner<Object> part2 = new 
Partitioner<Object>() {
+                               @Override
+                               public int partition(Object key, int 
numPartitions) {
+                                       return 0;
+                               }
+                       };
+                       
+                       SortMergeJoinDescriptor descr = new 
SortMergeJoinDescriptor(keysLeft, keysRight);
+                       
+                       // test incompatible hash with custom partitioning
+                       {
+                               RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+                               reqLeft.setAnyPartitioning(keysLeft);
+                               RequestedGlobalProperties reqRight = new 
RequestedGlobalProperties();
+                               reqRight.setAnyPartitioning(keysRight);
+                               
+                               GlobalProperties propsLeft = new 
GlobalProperties();
+                               propsLeft.setHashPartitioned(keysLeft);
+                               GlobalProperties propsRight = new 
GlobalProperties();
+                               propsRight.setCustomPartitioned(keysRight, 
part);
+                               
+                               assertFalse(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+                       }
+                       
+                       // test incompatible custom partitionings
+                       {
+                               RequestedGlobalProperties reqLeft = new 
RequestedGlobalProperties();
+                               reqLeft.setAnyPartitioning(keysLeft);
+                               RequestedGlobalProperties reqRight = new 
RequestedGlobalProperties();
+                               reqRight.setAnyPartitioning(keysRight);
+                               
+                               GlobalProperties propsLeft = new 
GlobalProperties();
+                               propsLeft.setCustomPartitioned(keysLeft, part);
+                               GlobalProperties propsRight = new 
GlobalProperties();
+                               propsRight.setCustomPartitioned(keysRight, 
part2);
+                               
+                               assertFalse(descr.areCompatible(reqLeft, 
reqRight, propsLeft, propsRight));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
new file mode 100644
index 0000000..9171cc7
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class JoinOnConflictingPartitioningsTest extends CompilerTestBase {
+
+       @Test
+       public void testRejectJoinOnHashAndRangePartitioning() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Tuple2<Long, Long>> input = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+                       
+                       Configuration cfg = new Configuration();
+                       cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+                       
cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+                       
+                       input.join(input).where(0).equalTo(0)
+                               .withParameters(cfg)
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       try {
+                               compileNoStats(p);
+                               fail("This should fail with an exception");
+                       }
+                       catch (CompilerException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java
new file mode 100644
index 0000000..2c1574b
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.OperatorInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.optimizer.dag.DataSourceNode;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.api.java.io.TextInputFormat;
+
+public class ChannelTest {
+       
+       @Test
+       public void testGetEstimatesNoReplicationFactor() {
+               final long NUM_RECORD = 1001;
+               final long SIZE = 467131;
+               
+               DataSourceNode source = getSourceNode();
+               SourcePlanNode planNode = new SourcePlanNode(source, "test 
node");
+               Channel channel = new Channel(planNode);
+
+               // no estimates here
+               Assert.assertEquals(-1, channel.getEstimatedOutputSize());
+               Assert.assertEquals(-1, channel.getEstimatedNumRecords());
+               
+               // set estimates
+               source.setEstimatedNumRecords(NUM_RECORD);
+               source.setEstimatedOutputSize(SIZE);
+               Assert.assertEquals(SIZE, channel.getEstimatedOutputSize());
+               Assert.assertEquals(NUM_RECORD, 
channel.getEstimatedNumRecords());
+       }
+       
+       @Test
+       public void testGetEstimatesWithReplicationFactor() {
+               final long NUM_RECORD = 1001;
+               final long SIZE = 467131;
+               
+               final int REPLICATION = 23;
+               
+               DataSourceNode source = getSourceNode();
+               SourcePlanNode planNode = new SourcePlanNode(source, "test 
node");
+               Channel channel = new Channel(planNode);
+               channel.setReplicationFactor(REPLICATION);
+
+               // no estimates here
+               Assert.assertEquals(-1, channel.getEstimatedOutputSize());
+               Assert.assertEquals(-1, channel.getEstimatedNumRecords());
+               
+               // set estimates
+               source.setEstimatedNumRecords(NUM_RECORD);
+               source.setEstimatedOutputSize(SIZE);
+               Assert.assertEquals(SIZE * REPLICATION, 
channel.getEstimatedOutputSize());
+               Assert.assertEquals(NUM_RECORD * REPLICATION, 
channel.getEstimatedNumRecords());
+       }
+       
+       
+//     private static final OptimizerNode getSingleInputNode() {
+//             return new MapNode(new MapOperatorBase<String, String, 
GenericMap<String,String>>(
+//                             new IdentityMapper<String>(),
+//                             new UnaryOperatorInformation<String, 
String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+//                             "map"));
+//     }
+       
+       private static final DataSourceNode getSourceNode() {
+               return new DataSourceNode(new GenericDataSourceBase<String, 
TextInputFormat>(
+                               new TextInputFormat(new Path("/ignored")), 
+                               new 
OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO),
+                               "source"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java
new file mode 100644
index 0000000..366d10d
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plandump;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class NumberFormattingTest {
+
+       @Test
+       public void testFormatNumberNoDigit() {
+               assertEquals("0.0", PlanJSONDumpGenerator.formatNumber(0));
+               assertEquals("0.00", 
PlanJSONDumpGenerator.formatNumber(0.0000000001));
+               assertEquals("-1.0", PlanJSONDumpGenerator.formatNumber(-1.0));
+               assertEquals("1.00", PlanJSONDumpGenerator.formatNumber(1));
+               assertEquals("17.00", PlanJSONDumpGenerator.formatNumber(17));
+               assertEquals("17.44", 
PlanJSONDumpGenerator.formatNumber(17.44));
+               assertEquals("143.00", PlanJSONDumpGenerator.formatNumber(143));
+               assertEquals("143.40", 
PlanJSONDumpGenerator.formatNumber(143.4));
+               assertEquals("143.50", 
PlanJSONDumpGenerator.formatNumber(143.5));
+               assertEquals("143.60", 
PlanJSONDumpGenerator.formatNumber(143.6));
+               assertEquals("143.45", 
PlanJSONDumpGenerator.formatNumber(143.45));
+               assertEquals("143.55", 
PlanJSONDumpGenerator.formatNumber(143.55));
+               assertEquals("143.65", 
PlanJSONDumpGenerator.formatNumber(143.65));
+               assertEquals("143.66", 
PlanJSONDumpGenerator.formatNumber(143.655));
+               
+               assertEquals("1.13 K", 
PlanJSONDumpGenerator.formatNumber(1126.0));
+               assertEquals("11.13 K", 
PlanJSONDumpGenerator.formatNumber(11126.0));
+               assertEquals("118.13 K", 
PlanJSONDumpGenerator.formatNumber(118126.0));
+
+               assertEquals("1.44 M", 
PlanJSONDumpGenerator.formatNumber(1435126.0));
+       }
+       
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java
new file mode 100644
index 0000000..7fea8a6
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public class DummyCoGroupFunction<L, R> extends RichCoGroupFunction<L, R, 
Tuple2<L, R>> {
+       
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void coGroup(Iterable<L> first, Iterable<R> second, 
Collector<Tuple2<L, R>> out) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java
new file mode 100644
index 0000000..6be8a24
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.util.Collector;
+
+public class DummyFlatJoinFunction<T> extends RichFlatJoinFunction<T, T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void join(T first, T second, Collector<T> out) {
+               out.collect(null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java
new file mode 100644
index 0000000..44d3695
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+
+public class DummyReducer<T> extends RichReduceFunction<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public T reduce(T a, T b) {
+               return a;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java
new file mode 100644
index 0000000..0316463
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+public class IdentityFlatMapper<T> implements FlatMapFunction<T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void flatMap(T value, Collector<T> out) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
new file mode 100644
index 0000000..11fd044
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.util.Collector;
+
+
+@Combinable
+public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void reduce(Iterable<T> values, Collector<T> out) {
+               for (T next : values) {
+                       out.collect(next);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java
new file mode 100644
index 0000000..f335846
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class IdentityKeyExtractor<T> implements KeySelector<T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public T getKey(T value) {
+               return value;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java
new file mode 100644
index 0000000..025b4d8
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+
+public class IdentityMapper<T> extends RichMapFunction<T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public T map(T value) {
+               return value;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java
new file mode 100644
index 0000000..6efbef1
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.util.Collector;
+
+public class IdentityPartitionerMapper<T> extends RichMapPartitionFunction<T, 
T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void mapPartition(Iterable<T> values, Collector<T> out) {
+               for (T in : values) {
+                       out.collect(in);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java
new file mode 100644
index 0000000..39c0e1b
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+
+public class SelectOneReducer<T> extends RichReduceFunction<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public T reduce(T value1, T value2) throws Exception {
+               return value1;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
new file mode 100644
index 0000000..48d13ca
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.util.Collector;
+
+
+@Combinable
+public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void reduce(Iterable<T> values, Collector<T> out) {
+               out.collect(values.iterator().next());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
new file mode 100644
index 0000000..6a84c44
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("deprecation")
+public class DummyCoGroupStub extends CoGroupFunction implements Serializable {
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void coGroup(Iterator<Record> records1, Iterator<Record> 
records2, Collector<Record> out) {
+               while (records1.hasNext()) {
+                       out.collect(records1.next());
+               }
+
+               while (records2.hasNext()) {
+                       out.collect(records2.next());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
new file mode 100644
index 0000000..8ee2285
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import org.apache.flink.api.java.record.functions.CrossFunction;
+import org.apache.flink.types.Record;
+
+@SuppressWarnings("deprecation")
+public class DummyCrossStub extends CrossFunction {
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public Record cross(Record first, Record second) throws Exception {
+               return first;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
new file mode 100644
index 0000000..0c816e7
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.record.io.DelimitedInputFormat;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+
+public final class DummyInputFormat extends DelimitedInputFormat {
+       private static final long serialVersionUID = 1L;
+       
+       private final IntValue integer = new IntValue(1);
+
+       @Override
+       public Record readRecord(Record target, byte[] bytes, int offset, int 
numBytes) {
+               target.setField(0, this.integer);
+               target.setField(1, this.integer);
+               return target;
+       }
+
+       @Override
+       public FileBaseStatistics getStatistics(BaseStatistics 
cachedStatistics) {
+               return (cachedStatistics instanceof FileBaseStatistics) ? 
(FileBaseStatistics) cachedStatistics : null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
new file mode 100644
index 0000000..d00be6e
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.record.functions.JoinFunction;
+import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("deprecation")
+@ConstantFieldsFirstExcept({})
+public class DummyMatchStub extends JoinFunction implements Serializable {
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void join(Record value1, Record value2, Collector<Record> out) 
throws Exception {
+               out.collect(value1);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
new file mode 100644
index 0000000..444b48e
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("deprecation")
+public class DummyNonPreservingMatchStub extends JoinFunction implements 
Serializable {
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void join(Record value1, Record value2, Collector<Record> out) 
throws Exception {
+               out.collect(value1);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
new file mode 100644
index 0000000..1bbe24c
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.util;
+
+
+import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
+import org.apache.flink.types.Record;
+
+
+public final class DummyOutputFormat extends DelimitedOutputFormat {
+       private static final long serialVersionUID = 1L;
+       
+       @Override
+       public int serializeRecord(Record rec, byte[] target) throws Exception {
+               return 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
new file mode 100644
index 0000000..cccc6cb
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.record.functions.MapFunction;
+import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("deprecation")
+@ConstantFieldsExcept({})
+public final class IdentityMap extends MapFunction implements Serializable {
+       private static final long serialVersionUID = 1L;
+       
+       @Override
+       public void map(Record record, Collector<Record> out) throws Exception {
+               out.collect(record);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
new file mode 100644
index 0000000..f45745d
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.api.java.record.functions.ReduceFunction;
+import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("deprecation")
+@ConstantFieldsExcept({})
+public final class IdentityReduce extends ReduceFunction implements 
Serializable {
+       private static final long serialVersionUID = 1L;
+       
+       @Override
+       public void reduce(Iterator<Record> records, Collector<Record> out) 
throws Exception {
+               while (records.hasNext()) {
+                       out.collect(records.next());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/resources/log4j-test.properties 
b/flink-optimizer/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2fb9345
--- /dev/null
+++ b/flink-optimizer/src/test/resources/log4j-test.properties
@@ -0,0 +1,19 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/resources/log4j.properties 
b/flink-optimizer/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fa3f937
--- /dev/null
+++ b/flink-optimizer/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target  = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/resources/logback-test.xml 
b/flink-optimizer/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-optimizer/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index a59bde4..bf44058 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -109,7 +109,7 @@ under the License.
                                                                        
<exclude>org.apache.flink:flink-java</exclude>
                                                                        
<exclude>org.apache.flink:flink-scala</exclude>
                                                                        
<exclude>org.apache.flink:flink-runtime</exclude>
-                                                                       
<exclude>org.apache.flink:flink-compiler</exclude>
+                                                                       
<exclude>org.apache.flink:flink-optimizer</exclude>
                                                                        
<exclude>org.apache.flink:flink-spargel</exclude>
                                                                        
<exclude>org.apache.flink:flink-avro</exclude>
                                                                        
<exclude>org.apache.flink:flink-java-examples</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 43eec6d..1c89170 100644
--- 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -113,7 +113,7 @@ under the License.
                                                                        
<exclude>org.apache.flink:flink-java</exclude>
                                                                        
<exclude>org.apache.flink:flink-scala</exclude>
                                                                        
<exclude>org.apache.flink:flink-runtime</exclude>
-                                                                       
<exclude>org.apache.flink:flink-compiler</exclude>
+                                                                       
<exclude>org.apache.flink:flink-optimizer</exclude>
                                                                        
<exclude>org.apache.flink:flink-spargel</exclude>
                                                                        
<exclude>org.apache.flink:flink-avro</exclude>
                                                                        
<exclude>org.apache.flink:flink-java-examples</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 523237e..f350062 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -47,7 +47,7 @@ under the License.
                
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-compiler</artifactId>
+                       <artifactId>flink-optimizer</artifactId>
                        <version>${project.version}</version>
                </dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-staging/flink-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/pom.xml 
b/flink-staging/flink-streaming/pom.xml
index 43b8181..e7c23db 100644
--- a/flink-staging/flink-streaming/pom.xml
+++ b/flink-staging/flink-streaming/pom.xml
@@ -49,12 +49,6 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-compiler</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-runtime</artifactId>
                        <version>${project.version}</version>
                </dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml
index e8da4e9..467fb44 100644
--- a/flink-test-utils/pom.xml
+++ b/flink-test-utils/pom.xml
@@ -42,7 +42,7 @@ under the License.
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-compiler</artifactId>
+                       <artifactId>flink-optimizer</artifactId>
                        <version>${project.version}</version>
                </dependency>
                <dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 7958b14..1670d85 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -46,7 +46,7 @@ under the License.
                
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-compiler</artifactId>
+                       <artifactId>flink-optimizer</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e771ad4..11cca1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,7 @@ under the License.
                <module>flink-java</module>
                <module>flink-scala</module>
                <module>flink-runtime</module>
-               <module>flink-compiler</module>
+               <module>flink-optimizer</module>
                <module>flink-examples</module>
                <module>flink-clients</module>
                <module>flink-tests</module>

Reply via email to