http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
new file mode 100644
index 0000000..5175d8c
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test verifies that the optimizer assigns the correct
+ * data exchange mode to a simple forward / shuffle plan.
+ *
+ * <pre>
+ *     (source) -> (map) -> (filter) -> (groupBy / reduce)
+ * </pre>
+ */
+@SuppressWarnings("serial")
+public class DataExchangeModeForwardTest extends CompilerTestBase {
+
+
+       @Test
+       public void testPipelinedForced() {
+               // PIPELINED_FORCED should result in pipelining all the way
+               verifySimpleForwardPlan(ExecutionMode.PIPELINED_FORCED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED);
+       }
+
+       @Test
+       public void testPipelined() {
+               // PIPELINED should result in pipelining all the way
+               verifySimpleForwardPlan(ExecutionMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED);
+       }
+
+       @Test
+       public void testBatch() {
+               // BATCH should result in batching the shuffle all the way
+               verifySimpleForwardPlan(ExecutionMode.BATCH,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.BATCH, 
DataExchangeMode.PIPELINED);
+       }
+
+       @Test
+       public void testBatchForced() {
+               // BATCH_FORCED should result in batching all the way
+               verifySimpleForwardPlan(ExecutionMode.BATCH_FORCED,
+                               DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+                               DataExchangeMode.BATCH, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.BATCH, DataExchangeMode.BATCH);
+       }
+
+       private void verifySimpleForwardPlan(ExecutionMode execMode,
+                                                                               
DataExchangeMode toMap,
+                                                                               
DataExchangeMode toFilter,
+                                                                               
DataExchangeMode toKeyExtractor,
+                                                                               
DataExchangeMode toCombiner,
+                                                                               
DataExchangeMode toReduce,
+                                                                               
DataExchangeMode toSink)
+       {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.getConfig().setExecutionMode(execMode);
+
+                       DataSet<String> dataSet = 
env.readTextFile("/never/accessed");
+                       dataSet
+                               .map(new MapFunction<String, Integer>() {
+                                       @Override
+                                       public Integer map(String value) {
+                                               return 0;
+                                       }
+                               })
+                               .filter(new FilterFunction<Integer>() {
+                                       @Override
+                                       public boolean filter(Integer value) {
+                                               return false;
+                                       }
+                               })
+                               .groupBy(new IdentityKeyExtractor<Integer>())
+                               .reduceGroup(new Top1GroupReducer<Integer>())
+                               .output(new DiscardingOutputFormat<Integer>());
+
+                       OptimizedPlan optPlan = 
compileNoStats(env.createProgramPlan());
+                       SinkPlanNode sinkNode = 
optPlan.getDataSinks().iterator().next();
+
+                       SingleInputPlanNode reduceNode = (SingleInputPlanNode) 
sinkNode.getPredecessor();
+                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getPredecessor();
+                       SingleInputPlanNode keyExtractorNode = 
(SingleInputPlanNode) combineNode.getPredecessor();
+
+                       SingleInputPlanNode filterNode = (SingleInputPlanNode) 
keyExtractorNode.getPredecessor();
+                       SingleInputPlanNode mapNode = (SingleInputPlanNode) 
filterNode.getPredecessor();
+
+                       assertEquals(toMap, 
mapNode.getInput().getDataExchangeMode());
+                       assertEquals(toFilter, 
filterNode.getInput().getDataExchangeMode());
+                       assertEquals(toKeyExtractor, 
keyExtractorNode.getInput().getDataExchangeMode());
+                       assertEquals(toCombiner, 
combineNode.getInput().getDataExchangeMode());
+                       assertEquals(toReduce, 
reduceNode.getInput().getDataExchangeMode());
+                       assertEquals(toSink, 
sinkNode.getInput().getDataExchangeMode());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
new file mode 100644
index 0000000..6b2691a
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test checks the correct assignment of the DataExchangeMode to
+ * connections for programs that branch, but do not re-join the branches.
+ *
+ * <pre>
+ *                      /---> (filter) -> (sink)
+ *                     /
+ *                    /
+ * (source) -> (map) -----------------\
+ *                    \               (join) -> (sink)
+ *                     \   (source) --/
+ *                      \
+ *                       \
+ *                        \-> (sink)
+ * </pre>
+ */
+@SuppressWarnings({"serial", "unchecked"})
+public class DataExchangeModeOpenBranchingTest extends CompilerTestBase {
+
+       @Test
+       public void testPipelinedForced() {
+               // PIPELINED_FORCED should result in pipelining all the way
+               verifyBranchigPlan(ExecutionMode.PIPELINED_FORCED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED);
+       }
+
+       @Test
+       public void testPipelined() {
+               // PIPELINED should result in pipelining all the way
+               verifyBranchigPlan(ExecutionMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED);
+       }
+
+       @Test
+       public void testBatch() {
+               // BATCH should result in batching the shuffle all the way
+               verifyBranchigPlan(ExecutionMode.BATCH,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED, 
DataExchangeMode.BATCH,
+                               DataExchangeMode.BATCH, 
DataExchangeMode.PIPELINED,
+                               DataExchangeMode.PIPELINED);
+       }
+
+       @Test
+       public void testBatchForced() {
+               // BATCH_FORCED should result in batching all the way
+               verifyBranchigPlan(ExecutionMode.BATCH_FORCED,
+                               DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+                               DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+                               DataExchangeMode.BATCH, DataExchangeMode.BATCH,
+                               DataExchangeMode.BATCH);
+       }
+
+       private void verifyBranchigPlan(ExecutionMode execMode,
+                                                                       
DataExchangeMode toMap,
+                                                                       
DataExchangeMode toFilter,
+                                                                       
DataExchangeMode toFilterSink,
+                                                                       
DataExchangeMode toJoin1,
+                                                                       
DataExchangeMode toJoin2,
+                                                                       
DataExchangeMode toJoinSink,
+                                                                       
DataExchangeMode toDirectSink)
+       {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.getConfig().setExecutionMode(execMode);
+
+                       DataSet<Tuple2<Long, Long>> data = 
env.generateSequence(1, 100000)
+                                       .map(new MapFunction<Long, Tuple2<Long, 
Long>>() {
+                                               @Override
+                                               public Tuple2<Long, Long> 
map(Long value) {
+                                                       return new Tuple2<Long, 
Long>(value, value);
+                                               }
+                                       });
+
+                       // output 1
+                       data
+                                       .filter(new FilterFunction<Tuple2<Long, 
Long>>() {
+                                               @Override
+                                               public boolean 
filter(Tuple2<Long, Long> value) {
+                                                       return false;
+                                               }
+                                       })
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink1");
+
+                       // output 2 does a join before a join
+                       data
+                                       .join(env.fromElements(new Tuple2<Long, 
Long>(1L, 2L)))
+                                       .where(1)
+                                       .equalTo(0)
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, 
Long>>>()).name("sink2");
+
+                       // output 3 is direct
+                       data
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>()).name("sink3");
+
+                       OptimizedPlan optPlan = 
compileNoStats(env.createProgramPlan());
+
+                       SinkPlanNode filterSink = 
findSink(optPlan.getDataSinks(), "sink1");
+                       SinkPlanNode joinSink = 
findSink(optPlan.getDataSinks(), "sink2");
+                       SinkPlanNode directSink = 
findSink(optPlan.getDataSinks(), "sink3");
+
+                       SingleInputPlanNode filterNode = (SingleInputPlanNode) 
filterSink.getPredecessor();
+                       SingleInputPlanNode mapNode = (SingleInputPlanNode) 
filterNode.getPredecessor();
+
+                       DualInputPlanNode joinNode = (DualInputPlanNode) 
joinSink.getPredecessor();
+                       assertEquals(mapNode, joinNode.getInput1().getSource());
+
+                       assertEquals(mapNode, directSink.getPredecessor());
+
+                       assertEquals(toFilterSink, 
filterSink.getInput().getDataExchangeMode());
+                       assertEquals(toJoinSink, 
joinSink.getInput().getDataExchangeMode());
+                       assertEquals(toDirectSink, 
directSink.getInput().getDataExchangeMode());
+
+                       assertEquals(toMap, 
mapNode.getInput().getDataExchangeMode());
+                       assertEquals(toFilter, 
filterNode.getInput().getDataExchangeMode());
+
+                       assertEquals(toJoin1, 
joinNode.getInput1().getDataExchangeMode());
+                       assertEquals(toJoin2, 
joinNode.getInput2().getDataExchangeMode());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       private SinkPlanNode findSink(Collection<SinkPlanNode> collection, 
String name) {
+               for (SinkPlanNode node : collection) {
+                       String nodeName = 
node.getOptimizerNode().getOperator().getName();
+                       if (nodeName != null && nodeName.equals(name)) {
+                               return node;
+                       }
+               }
+
+               throw new IllegalArgumentException("No node with that name was 
found.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
new file mode 100644
index 0000000..1a14be5
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataexchange;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dag.SinkJoiner;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.optimizer.traversals.BranchesVisitor;
+import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
+import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test checks whether connections are correctly marked as pipelined 
breaking.
+ */
+@SuppressWarnings("serial")
+public class PipelineBreakingTest {
+
+       /**
+        * Tests that no pipeline breakers are inserted into a simple forward
+        * pipeline.
+        *
+        * <pre>
+        *     (source) -> (map) -> (filter) -> (groupBy / reduce)
+        * </pre>
+        */
+       @Test
+       public void testSimpleForwardPlan() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<String> dataSet = 
env.readTextFile("/never/accessed");
+                       dataSet
+                               .map(new MapFunction<String, Integer>() {
+                                       @Override
+                                       public Integer map(String value) {
+                                               return 0;
+                                       }
+                               })
+                               .filter(new FilterFunction<Integer>() {
+                                       @Override
+                                       public boolean filter(Integer value) {
+                                               return false;
+                                       }
+                               })
+                               .groupBy(new IdentityKeyExtractor<Integer>())
+                               .reduceGroup(new Top1GroupReducer<Integer>())
+                               .output(new DiscardingOutputFormat<Integer>());
+
+                       DataSinkNode sinkNode = 
convertPlan(env.createProgramPlan()).get(0);
+
+                       SingleInputNode reduceNode = (SingleInputNode) 
sinkNode.getPredecessorNode();
+                       SingleInputNode keyExtractorNode = (SingleInputNode) 
reduceNode.getPredecessorNode();
+
+                       SingleInputNode filterNode = (SingleInputNode) 
keyExtractorNode.getPredecessorNode();
+                       SingleInputNode mapNode = (SingleInputNode) 
filterNode.getPredecessorNode();
+
+                       
assertFalse(sinkNode.getInputConnection().isBreakingPipeline());
+                       
assertFalse(reduceNode.getIncomingConnection().isBreakingPipeline());
+                       
assertFalse(keyExtractorNode.getIncomingConnection().isBreakingPipeline());
+                       
assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
+                       
assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       /**
+        * Tests that branching plans, where the branches are not re-joined,
+        * do not place pipeline breakers.
+        * 
+        * <pre>
+        *                      /---> (filter) -> (sink)
+        *                     /
+        *                    /
+        * (source) -> (map) -----------------\
+        *                    \               (join) -> (sink)
+        *                     \   (source) --/
+        *                      \
+        *                       \
+        *                        \-> (sink)
+        * </pre>
+        */
+       @Test
+       public void testBranchingPlanNotReJoined() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<Integer> data = 
env.readTextFile("/never/accessed")
+                               .map(new MapFunction<String, Integer>() {
+                                       @Override
+                                       public Integer map(String value) {
+                                               return 0;
+                                       }
+                               });
+
+                       // output 1
+                       data
+                               .filter(new FilterFunction<Integer>() {
+                                       @Override
+                                       public boolean filter(Integer value) {
+                                               return false;
+                                       }
+                               })
+                               .output(new DiscardingOutputFormat<Integer>());
+
+                       // output 2 does a join before a join
+                       data
+                               .join(env.fromElements(1, 2, 3, 4))
+                                       .where(new 
IdentityKeyExtractor<Integer>())
+                                       .equalTo(new 
IdentityKeyExtractor<Integer>())
+                               .output(new 
DiscardingOutputFormat<Tuple2<Integer, Integer>>());
+
+                       // output 3 is direct
+                       data
+                               .output(new DiscardingOutputFormat<Integer>());
+
+                       List<DataSinkNode> sinks = 
convertPlan(env.createProgramPlan());
+
+                       // gather the optimizer DAG nodes
+
+                       DataSinkNode sinkAfterFilter = sinks.get(0);
+                       DataSinkNode sinkAfterJoin = sinks.get(1);
+                       DataSinkNode sinkDirect = sinks.get(2);
+
+                       SingleInputNode filterNode = (SingleInputNode) 
sinkAfterFilter.getPredecessorNode();
+                       SingleInputNode mapNode = (SingleInputNode) 
filterNode.getPredecessorNode();
+
+                       TwoInputNode joinNode = (TwoInputNode) 
sinkAfterJoin.getPredecessorNode();
+                       SingleInputNode joinInput = (SingleInputNode) 
joinNode.getSecondPredecessorNode();
+
+                       // verify the non-pipeline breaking status
+
+                       
assertFalse(sinkAfterFilter.getInputConnection().isBreakingPipeline());
+                       
assertFalse(sinkAfterJoin.getInputConnection().isBreakingPipeline());
+                       
assertFalse(sinkDirect.getInputConnection().isBreakingPipeline());
+
+                       
assertFalse(filterNode.getIncomingConnection().isBreakingPipeline());
+                       
assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+
+                       
assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
+                       
assertFalse(joinNode.getSecondIncomingConnection().isBreakingPipeline());
+                       
assertFalse(joinInput.getIncomingConnection().isBreakingPipeline());
+
+                       // some other sanity checks on the plan construction 
(cannot hurt)
+
+                       assertEquals(mapNode, ((SingleInputNode) 
joinNode.getFirstPredecessorNode()).getPredecessorNode());
+                       assertEquals(mapNode, sinkDirect.getPredecessorNode());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       /**
+        * Tests that branches that are re-joined have place pipeline breakers.
+        * 
+        * <pre>
+        *                                         /-> (sink)
+        *                                        /
+        *                         /-> (reduce) -+          /-> (flatmap) -> 
(sink)
+        *                        /               \        /
+        *     (source) -> (map) -                (join) -+-----\
+        *                        \               /              \
+        *                         \-> (filter) -+                \
+        *                                       \                (co group) -> 
(sink)
+        *                                        \                /
+        *                                         \-> (reduce) - /
+        * </pre>
+        */
+       @Test
+       public void testReJoinedBranches() {
+               try {
+                       // build a test program
+
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<Tuple2<Long, Long>> data = 
env.fromElements(33L, 44L)
+                                       .map(new MapFunction<Long, Tuple2<Long, 
Long>>() {
+                                               @Override
+                                               public Tuple2<Long, Long> 
map(Long value) {
+                                                       return new Tuple2<Long, 
Long>(value, value);
+                                               }
+                                       });
+
+                       DataSet<Tuple2<Long, Long>> reduced = 
data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>());
+                       reduced.output(new DiscardingOutputFormat<Tuple2<Long, 
Long>>());
+                       
+                       DataSet<Tuple2<Long, Long>> filtered = data.filter(new 
FilterFunction<Tuple2<Long, Long>>() {
+                               @Override
+                               public boolean filter(Tuple2<Long, Long> value) 
throws Exception {
+                                       return false;
+                               }
+                       });
+                       
+                       DataSet<Tuple2<Long, Long>> joined = 
reduced.join(filtered)
+                                       .where(1).equalTo(1)
+                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
+                       
+                       joined.flatMap(new IdentityFlatMapper<Tuple2<Long, 
Long>>())
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+                       joined.coGroup(filtered.groupBy(1).reduceGroup(new 
Top1GroupReducer<Tuple2<Long, Long>>()))
+                                       .where(0).equalTo(0)
+                                       .with(new 
DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+                       List<DataSinkNode> sinks = 
convertPlan(env.createProgramPlan());
+
+                       // gather the optimizer DAG nodes
+
+                       DataSinkNode sinkAfterReduce = sinks.get(0);
+                       DataSinkNode sinkAfterFlatMap = sinks.get(1);
+                       DataSinkNode sinkAfterCoGroup = sinks.get(2);
+
+                       SingleInputNode reduceNode = (SingleInputNode) 
sinkAfterReduce.getPredecessorNode();
+                       SingleInputNode mapNode = (SingleInputNode) 
reduceNode.getPredecessorNode();
+
+                       SingleInputNode flatMapNode = (SingleInputNode) 
sinkAfterFlatMap.getPredecessorNode();
+                       TwoInputNode joinNode = (TwoInputNode) 
flatMapNode.getPredecessorNode();
+                       SingleInputNode filterNode = (SingleInputNode) 
joinNode.getSecondPredecessorNode();
+
+                       TwoInputNode coGroupNode = (TwoInputNode) 
sinkAfterCoGroup.getPredecessorNode();
+                       SingleInputNode otherReduceNode = (SingleInputNode) 
coGroupNode.getSecondPredecessorNode();
+
+                       // test sanity checks (that we constructed the DAG 
correctly)
+
+                       assertEquals(reduceNode, 
joinNode.getFirstPredecessorNode());
+                       assertEquals(mapNode, filterNode.getPredecessorNode());
+                       assertEquals(joinNode, 
coGroupNode.getFirstPredecessorNode());
+                       assertEquals(filterNode, 
otherReduceNode.getPredecessorNode());
+
+                       // verify the pipeline breaking status
+
+                       
assertFalse(sinkAfterReduce.getInputConnection().isBreakingPipeline());
+                       
assertFalse(sinkAfterFlatMap.getInputConnection().isBreakingPipeline());
+                       
assertFalse(sinkAfterCoGroup.getInputConnection().isBreakingPipeline());
+
+                       
assertFalse(mapNode.getIncomingConnection().isBreakingPipeline());
+                       
assertFalse(flatMapNode.getIncomingConnection().isBreakingPipeline());
+                       
assertFalse(joinNode.getFirstIncomingConnection().isBreakingPipeline());
+                       
assertFalse(coGroupNode.getFirstIncomingConnection().isBreakingPipeline());
+                       
assertFalse(coGroupNode.getSecondIncomingConnection().isBreakingPipeline());
+
+                       // these should be pipeline breakers
+                       
assertTrue(reduceNode.getIncomingConnection().isBreakingPipeline());
+                       
assertTrue(filterNode.getIncomingConnection().isBreakingPipeline());
+                       
assertTrue(otherReduceNode.getIncomingConnection().isBreakingPipeline());
+                       
assertTrue(joinNode.getSecondIncomingConnection().isBreakingPipeline());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       private static List<DataSinkNode> convertPlan(Plan p) {
+               GraphCreatingVisitor dagCreator =
+                               new GraphCreatingVisitor(17, 
p.getExecutionConfig().getExecutionMode());
+
+               // create the DAG
+               p.accept(dagCreator);
+               List<DataSinkNode> sinks = dagCreator.getSinks();
+
+               // build a single root and run the branch tracking logic
+               OptimizerNode rootNode;
+               if (sinks.size() == 1) {
+                       rootNode = sinks.get(0);
+               }
+               else {
+                       Iterator<DataSinkNode> iter = sinks.iterator();
+                       rootNode = iter.next();
+
+                       while (iter.hasNext()) {
+                               rootNode = new SinkJoiner(rootNode, 
iter.next());
+                       }
+               }
+               rootNode.accept(new IdAndEstimatesVisitor(null));
+               rootNode.accept(new BranchesVisitor());
+
+               return sinks;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
new file mode 100644
index 0000000..3e32905
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class GlobalPropertiesFilteringTest {
+
+       private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, 
Integer, Integer, Integer, Integer>> tupleInfo =
+                       new TupleTypeInfo<Tuple8<Integer, Integer, Integer, 
Integer, Integer, Integer, Integer, Integer>>(
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO,
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+                       );
+
+       @Test
+       public void testAllErased1() {
+
+               SingleInputSemanticProperties semProps = new 
SingleInputSemanticProperties();
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1));
+               gprops.addUniqueFieldCombination(new FieldSet(3, 4));
+               gprops.addUniqueFieldCombination(new FieldSet(5, 6));
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(semProps, 0);
+
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
+               assertNull(result.getPartitioningFields());
+               assertNull(result.getPartitioningOrdering());
+               assertNull(result.getUniqueFieldCombination());
+       }
+
+       @Test
+       public void testAllErased2() {
+
+               SingleInputSemanticProperties semProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(semProps, new 
String[]{"2"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1));
+               gprops.addUniqueFieldCombination(new FieldSet(3, 4));
+               gprops.addUniqueFieldCombination(new FieldSet(5, 6));
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(semProps, 0);
+
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
+               assertNull(result.getPartitioningFields());
+               assertNull(result.getPartitioningOrdering());
+               assertNull(result.getUniqueFieldCombination());
+       }
+
+       @Test
+       public void testHashPartitioningPreserved1() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.HASH_PARTITIONED, 
result.getPartitioning());
+               FieldList pFields = result.getPartitioningFields();
+               assertEquals(3, pFields.size());
+               assertTrue(pFields.contains(0));
+               assertTrue(pFields.contains(1));
+               assertTrue(pFields.contains(4));
+       }
+
+       @Test
+       public void testHashPartitioningPreserved2() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.HASH_PARTITIONED, 
result.getPartitioning());
+               FieldList pFields = result.getPartitioningFields();
+               assertEquals(3, pFields.size());
+               assertTrue(pFields.contains(1));
+               assertTrue(pFields.contains(2));
+               assertTrue(pFields.contains(3));
+       }
+
+       @Test
+       public void testHashPartitioningErased() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1, 4));
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
+               assertNull(result.getPartitioningFields());
+       }
+
+       @Test
+       public void testAnyPartitioningPreserved1() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setAnyPartitioning(new FieldList(0, 1, 4));
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.ANY_PARTITIONING, 
result.getPartitioning());
+               FieldList pFields = result.getPartitioningFields();
+               assertEquals(3, pFields.size());
+               assertTrue(pFields.contains(0));
+               assertTrue(pFields.contains(1));
+               assertTrue(pFields.contains(4));
+       }
+
+       @Test
+       public void testAnyPartitioningPreserved2() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setAnyPartitioning(new FieldList(0, 1, 4));
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.ANY_PARTITIONING, 
result.getPartitioning());
+               FieldList pFields = result.getPartitioningFields();
+               assertEquals(3, pFields.size());
+               assertTrue(pFields.contains(1));
+               assertTrue(pFields.contains(2));
+               assertTrue(pFields.contains(3));
+       }
+
+       @Test
+       public void testAnyPartitioningErased() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setAnyPartitioning(new FieldList(0, 1, 4));
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
+               assertNull(result.getPartitioningFields());
+       }
+
+       @Test
+       public void testCustomPartitioningPreserved1() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
+               gprops.setCustomPartitioned(new FieldList(0, 4), myP);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, 
result.getPartitioning());
+               FieldList pFields = result.getPartitioningFields();
+               assertEquals(2, pFields.size());
+               assertTrue(pFields.contains(0));
+               assertTrue(pFields.contains(4));
+               assertEquals(myP, result.getCustomPartitioner());
+       }
+
+       @Test
+       public void testCustomPartitioningPreserved2() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
+               gprops.setCustomPartitioned(new FieldList(0, 4), myP);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, 
result.getPartitioning());
+               FieldList pFields = result.getPartitioningFields();
+               assertEquals(2, pFields.size());
+               assertTrue(pFields.contains(1));
+               assertTrue(pFields.contains(3));
+               assertEquals(myP, result.getCustomPartitioner());
+       }
+
+       @Test
+       public void testCustomPartitioningErased() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               Partitioner<Tuple2<Long, Integer>> myP = new MockPartitioner();
+               gprops.setCustomPartitioned(new FieldList(0, 4), myP);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
+               assertNull(result.getPartitioningFields());
+               assertNull(result.getCustomPartitioner());
+       }
+
+       @Test
+       public void testRangePartitioningPreserved1() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"1;2;5"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(2, StringValue.class, Order.ASCENDING);
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setRangePartitioned(o);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.RANGE_PARTITIONED, 
result.getPartitioning());
+               FieldList pFields = result.getPartitioningFields();
+               assertEquals(3, pFields.size());
+               assertEquals(1, pFields.get(0).intValue());
+               assertEquals(5, pFields.get(1).intValue());
+               assertEquals(2, pFields.get(2).intValue());
+               Ordering pOrder = result.getPartitioningOrdering();
+               assertEquals(3, pOrder.getNumberOfFields());
+               assertEquals(1, pOrder.getFieldNumber(0).intValue());
+               assertEquals(5, pOrder.getFieldNumber(1).intValue());
+               assertEquals(2, pOrder.getFieldNumber(2).intValue());
+               assertEquals(Order.ASCENDING, pOrder.getOrder(0));
+               assertEquals(Order.DESCENDING, pOrder.getOrder(1));
+               assertEquals(Order.ASCENDING, pOrder.getOrder(2));
+               assertEquals(IntValue.class, pOrder.getType(0));
+               assertEquals(LongValue.class, pOrder.getType(1));
+               assertEquals(StringValue.class, pOrder.getType(2));
+       }
+
+       @Test
+       public void testRangePartitioningPreserved2() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"1->3; 2->0; 5->1"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(2, StringValue.class, Order.ASCENDING);
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setRangePartitioned(o);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.RANGE_PARTITIONED, 
result.getPartitioning());
+               FieldList pFields = result.getPartitioningFields();
+               assertEquals(3, pFields.size());
+               assertEquals(3, pFields.get(0).intValue());
+               assertEquals(1, pFields.get(1).intValue());
+               assertEquals(0, pFields.get(2).intValue());
+               Ordering pOrder = result.getPartitioningOrdering();
+               assertEquals(3, pOrder.getNumberOfFields());
+               assertEquals(3, pOrder.getFieldNumber(0).intValue());
+               assertEquals(1, pOrder.getFieldNumber(1).intValue());
+               assertEquals(0, pOrder.getFieldNumber(2).intValue());
+               assertEquals(Order.ASCENDING, pOrder.getOrder(0));
+               assertEquals(Order.DESCENDING, pOrder.getOrder(1));
+               assertEquals(Order.ASCENDING, pOrder.getOrder(2));
+               assertEquals(IntValue.class, pOrder.getType(0));
+               assertEquals(LongValue.class, pOrder.getType(1));
+               assertEquals(StringValue.class, pOrder.getType(2));
+       }
+
+       @Test
+       public void testRangePartitioningErased() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"1;5"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(2, StringValue.class, Order.ASCENDING);
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setRangePartitioned(o);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
+               assertNull(result.getPartitioningOrdering());
+               assertNull(result.getPartitioningFields());
+       }
+
+       @Test
+       public void testRebalancingPreserved() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0->1; 1->2; 4->3"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setForcedRebalanced();
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+
+               assertEquals(PartitioningProperty.FORCED_REBALANCED, 
result.getPartitioning());
+               assertNull(result.getPartitioningFields());
+       }
+
+       @Test
+       public void testUniqueFieldGroupsPreserved1() {
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+               FieldSet set1 = new FieldSet(0, 1, 2);
+               FieldSet set2 = new FieldSet(3, 4);
+               FieldSet set3 = new FieldSet(4, 5, 6, 7);
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.addUniqueFieldCombination(set1);
+               gprops.addUniqueFieldCombination(set2);
+               gprops.addUniqueFieldCombination(set3);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+               Set<FieldSet> unique = result.getUniqueFieldCombination();
+               FieldSet expected1 = new FieldSet(0, 1, 2);
+               FieldSet expected2 = new FieldSet(3, 4);
+
+               Assert.assertTrue(unique.size() == 2);
+               Assert.assertTrue(unique.contains(expected1));
+               Assert.assertTrue(unique.contains(expected2));
+       }
+
+       @Test
+       public void testUniqueFieldGroupsPreserved2() {
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0->5;1;2;3->6;4"}, null, null, tupleInfo, tupleInfo);
+
+               FieldSet set1 = new FieldSet(0, 1, 2);
+               FieldSet set2 = new FieldSet(3, 4);
+               FieldSet set3 = new FieldSet(4, 5, 6, 7);
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.addUniqueFieldCombination(set1);
+               gprops.addUniqueFieldCombination(set2);
+               gprops.addUniqueFieldCombination(set3);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+               Set<FieldSet> unique = result.getUniqueFieldCombination();
+               FieldSet expected1 = new FieldSet(1, 2, 5);
+               FieldSet expected2 = new FieldSet(4, 6);
+
+               Assert.assertTrue(unique.size() == 2);
+               Assert.assertTrue(unique.contains(expected1));
+               Assert.assertTrue(unique.contains(expected2));
+       }
+
+       @Test
+       public void testUniqueFieldGroupsErased() {
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0; 3; 5; 6; 7"}, null, null, tupleInfo, tupleInfo);
+
+               FieldSet set1 = new FieldSet(0, 1, 2);
+               FieldSet set2 = new FieldSet(3, 4);
+               FieldSet set3 = new FieldSet(4, 5, 6, 7);
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.addUniqueFieldCombination(set1);
+               gprops.addUniqueFieldCombination(set2);
+               gprops.addUniqueFieldCombination(set3);
+
+               GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
+               Assert.assertNull(result.getUniqueFieldCombination());
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testInvalidInputIndex() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+               GlobalProperties gprops = new GlobalProperties();
+               gprops.setHashPartitioned(new FieldList(0, 1));
+
+               gprops.filterBySemanticProperties(sprops, 1);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
new file mode 100644
index 0000000..52826d6
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+public class GlobalPropertiesMatchingTest {
+
+       @Test
+       public void testMatchingAnyPartitioning() {
+               try {
+                       
+                       RequestedGlobalProperties req = new 
RequestedGlobalProperties();
+                       req.setAnyPartitioning(new FieldSet(6, 2));
+                       
+                       // match any partitioning
+                       {
+                               GlobalProperties gp1 = new GlobalProperties();
+                               gp1.setAnyPartitioning(new FieldList(2, 6));
+                               assertTrue(req.isMetBy(gp1));
+
+                               GlobalProperties gp2 = new GlobalProperties();
+                               gp2.setAnyPartitioning(new FieldList(6, 2));
+                               assertTrue(req.isMetBy(gp2));
+
+                               GlobalProperties gp3 = new GlobalProperties();
+                               gp3.setAnyPartitioning(new FieldList(6, 2, 4));
+                               assertFalse(req.isMetBy(gp3));
+
+                               GlobalProperties gp4 = new GlobalProperties();
+                               gp4.setAnyPartitioning(new FieldList(6, 1));
+                               assertFalse(req.isMetBy(gp4));
+
+                               GlobalProperties gp5 = new GlobalProperties();
+                               gp5.setAnyPartitioning(new FieldList(2));
+                               assertTrue(req.isMetBy(gp5));
+                       }
+
+                       // match hash partitioning
+                       {
+                               GlobalProperties gp1 = new GlobalProperties();
+                               gp1.setHashPartitioned(new FieldList(2, 6));
+                               assertTrue(req.isMetBy(gp1));
+
+                               GlobalProperties gp2 = new GlobalProperties();
+                               gp2.setHashPartitioned(new FieldList(6, 2));
+                               assertTrue(req.isMetBy(gp2));
+
+                               GlobalProperties gp3 = new GlobalProperties();
+                               gp3.setHashPartitioned(new FieldList(6, 1));
+                               assertFalse(req.isMetBy(gp3));
+                       }
+                       
+                       // match range partitioning
+                       {
+                               GlobalProperties gp1 = new GlobalProperties();
+                               gp1.setRangePartitioned(new Ordering(2, null, 
Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+                               assertTrue(req.isMetBy(gp1));
+                               
+                               GlobalProperties gp2 = new GlobalProperties();
+                               gp2.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+                               assertTrue(req.isMetBy(gp2));
+
+                               GlobalProperties gp3 = new GlobalProperties();
+                               gp3.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+                               assertFalse(req.isMetBy(gp3));
+                               
+                               GlobalProperties gp4 = new GlobalProperties();
+                               gp4.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING));
+                               assertTrue(req.isMetBy(gp4));
+                       }
+                       
+                       // match custom partitioning
+                       {
+                               GlobalProperties gp1 = new GlobalProperties();
+                               gp1.setCustomPartitioned(new FieldList(2, 6), 
new MockPartitioner());
+                               assertTrue(req.isMetBy(gp1));
+                               
+                               GlobalProperties gp2 = new GlobalProperties();
+                               gp2.setCustomPartitioned(new FieldList(6, 2), 
new MockPartitioner());
+                               assertTrue(req.isMetBy(gp2));
+                               
+                               GlobalProperties gp3 = new GlobalProperties();
+                               gp3.setCustomPartitioned(new FieldList(6, 1), 
new MockPartitioner());
+                               assertFalse(req.isMetBy(gp3));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testMatchingCustomPartitioning() {
+               try {
+                       final Partitioner<Tuple2<Long, Integer>> partitioner = 
new MockPartitioner();
+                       
+                       RequestedGlobalProperties req = new 
RequestedGlobalProperties();
+                       req.setCustomPartitioned(new FieldSet(6, 2), 
partitioner);
+                       
+                       // match custom partitionings
+                       {
+                               GlobalProperties gp1 = new GlobalProperties();
+                               gp1.setCustomPartitioned(new FieldList(2, 6), 
partitioner);
+                               assertTrue(req.isMetBy(gp1));
+                               
+                               GlobalProperties gp2 = new GlobalProperties();
+                               gp2.setCustomPartitioned(new FieldList(6, 2), 
partitioner);
+                               assertTrue(req.isMetBy(gp2));
+                               
+                               GlobalProperties gp3 = new GlobalProperties();
+                               gp3.setCustomPartitioned(new FieldList(6, 2), 
new MockPartitioner());
+                               assertFalse(req.isMetBy(gp3));
+                       }
+                       
+                       // cannot match other types of partitionings
+                       {
+                               GlobalProperties gp1 = new GlobalProperties();
+                               gp1.setAnyPartitioning(new FieldList(6, 2));
+                               assertFalse(req.isMetBy(gp1));
+                               
+                               GlobalProperties gp2 = new GlobalProperties();
+                               gp2.setHashPartitioned(new FieldList(6, 2));
+                               assertFalse(req.isMetBy(gp2));
+                               
+                               GlobalProperties gp3 = new GlobalProperties();
+                               gp3.setRangePartitioned(new Ordering(2, null, 
Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+                               assertFalse(req.isMetBy(gp3));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testStrictlyMatchingAnyPartitioning() {
+
+               RequestedGlobalProperties req = new RequestedGlobalProperties();
+               req.setAnyPartitioning(new FieldList(6, 2));
+
+               // match any partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setAnyPartitioning(new FieldList(6, 2));
+                       assertTrue(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setAnyPartitioning(new FieldList(2, 6));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setAnyPartitioning(new FieldList(6, 2, 3));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp4 = new GlobalProperties();
+                       gp3.setAnyPartitioning(new FieldList(6, 1));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp5 = new GlobalProperties();
+                       gp4.setAnyPartitioning(new FieldList(2));
+                       assertFalse(req.isMetBy(gp4));
+               }
+
+               // match hash partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setHashPartitioned(new FieldList(6, 2));
+                       assertTrue(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setHashPartitioned(new FieldList(2, 6));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setHashPartitioned(new FieldList(6, 1));
+                       assertFalse(req.isMetBy(gp3));
+               }
+
+               // match range partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+                       assertTrue(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setRangePartitioned(new Ordering(2, null, 
Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp4 = new GlobalProperties();
+                       gp4.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING));
+                       assertFalse(req.isMetBy(gp4));
+               }
+
+       }
+
+       @Test
+       public void testStrictlyMatchingHashPartitioning() {
+
+               RequestedGlobalProperties req = new RequestedGlobalProperties();
+               req.setHashPartitioned(new FieldList(6, 2));
+
+               // match any partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setAnyPartitioning(new FieldList(6, 2));
+                       assertFalse(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setAnyPartitioning(new FieldList(2, 6));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setAnyPartitioning(new FieldList(6, 1));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp4 = new GlobalProperties();
+                       gp4.setAnyPartitioning(new FieldList(2));
+                       assertFalse(req.isMetBy(gp4));
+               }
+
+               // match hash partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setHashPartitioned(new FieldList(6, 2));
+                       assertTrue(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setHashPartitioned(new FieldList(2, 6));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setHashPartitioned(new FieldList(6, 1));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp4 = new GlobalProperties();
+                       gp4.setHashPartitioned(new FieldList(6, 2, 0));
+                       assertFalse(req.isMetBy(gp4));
+               }
+
+               // match range partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+                       assertFalse(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setRangePartitioned(new Ordering(2, null, 
Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp4 = new GlobalProperties();
+                       gp4.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING));
+                       assertFalse(req.isMetBy(gp4));
+               }
+
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
new file mode 100644
index 0000000..0868720
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesPushdownTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.junit.Test;
+
+public class GlobalPropertiesPushdownTest {
+
+       @Test
+       public void testAnyPartitioningPushedDown() {
+               try {
+                       RequestedGlobalProperties req = new 
RequestedGlobalProperties();
+                       req.setAnyPartitioning(new FieldSet(3, 1));
+                       
+                       RequestedGlobalProperties preserved = 
req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+                       assertEquals(PartitioningProperty.ANY_PARTITIONING, 
preserved.getPartitioning());
+                       
assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
+                       
+                       RequestedGlobalProperties nonPreserved = 
req.filterBySemanticProperties(getNonePreservingSemProps(), 0);
+                       assertTrue(nonPreserved == null || 
nonPreserved.isTrivial());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testHashPartitioningPushedDown() {
+               try {
+                       RequestedGlobalProperties req = new 
RequestedGlobalProperties();
+                       req.setHashPartitioned(new FieldSet(3, 1));
+                       
+                       RequestedGlobalProperties preserved = 
req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+                       assertEquals(PartitioningProperty.HASH_PARTITIONED, 
preserved.getPartitioning());
+                       
assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
+                       
+                       RequestedGlobalProperties nonPreserved = 
req.filterBySemanticProperties(getNonePreservingSemProps(), 0);
+                       assertTrue(nonPreserved == null || 
nonPreserved.isTrivial());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testCustomPartitioningNotPushedDown() {
+               try {
+                       RequestedGlobalProperties req = new 
RequestedGlobalProperties();
+                       req.setCustomPartitioned(new FieldSet(3, 1), new 
MockPartitioner());
+                       
+                       RequestedGlobalProperties pushedDown = 
req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+                       assertTrue(pushedDown == null || 
pushedDown.isTrivial());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testForcedReblancingNotPushedDown() {
+               try {
+                       RequestedGlobalProperties req = new 
RequestedGlobalProperties();
+                       req.setForceRebalancing();
+                       
+                       RequestedGlobalProperties pushedDown = 
req.filterBySemanticProperties(getAllPreservingSemProps(), 0);
+                       assertTrue(pushedDown == null || 
pushedDown.isTrivial());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private static SemanticProperties getAllPreservingSemProps() {
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
+       }
+       
+       private static SemanticProperties getNonePreservingSemProps() {
+               return new SingleInputSemanticProperties();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
new file mode 100644
index 0000000..1ff62ed
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/LocalPropertiesFilteringTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+public class LocalPropertiesFilteringTest {
+
+       private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, 
Integer, Integer, Integer, Integer>> tupleInfo =
+                       new TupleTypeInfo<Tuple8<Integer, Integer, Integer, 
Integer, Integer, Integer, Integer, Integer>>(
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO,
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+                       );
+
+       @Test
+       public void testAllErased1() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, 
null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(0, 1, 2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+               lProps = lProps.addUniqueFields(new FieldSet(5,6));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testAllErased2() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"5"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(0, 1, 2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testGroupingPreserved1() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(0, 2, 3));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNotNull(filtered.getGroupedFields());
+               assertEquals(3, filtered.getGroupedFields().size());
+               assertTrue(filtered.getGroupedFields().contains(0));
+               assertTrue(filtered.getGroupedFields().contains(2));
+               assertTrue(filtered.getGroupedFields().contains(3));
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testGroupingPreserved2() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0->4;2->0;3->7"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(0, 2, 3));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNotNull(filtered.getGroupedFields());
+               assertEquals(3, filtered.getGroupedFields().size());
+               assertTrue(filtered.getGroupedFields().contains(4));
+               assertTrue(filtered.getGroupedFields().contains(0));
+               assertTrue(filtered.getGroupedFields().contains(7));
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testGroupingErased() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0->4;2->0"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(0, 2, 3));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testSortingPreserved1() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;2;5"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               LocalProperties lProps = LocalProperties.forOrdering(o);
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldList gFields = filtered.getGroupedFields();
+               Ordering order = filtered.getOrdering();
+
+               assertNotNull(gFields);
+               assertEquals(3, gFields.size());
+               assertTrue(gFields.contains(0));
+               assertTrue(gFields.contains(2));
+               assertTrue(gFields.contains(5));
+               assertNotNull(order);
+               assertEquals(3, order.getNumberOfFields());
+               assertEquals(2, order.getFieldNumber(0).intValue());
+               assertEquals(0, order.getFieldNumber(1).intValue());
+               assertEquals(5, order.getFieldNumber(2).intValue());
+               assertEquals(Order.ASCENDING, order.getOrder(0));
+               assertEquals(Order.DESCENDING, order.getOrder(1));
+               assertEquals(Order.DESCENDING, order.getOrder(2));
+               assertEquals(IntValue.class, order.getType(0));
+               assertEquals(StringValue.class, order.getType(1));
+               assertEquals(LongValue.class, order.getType(2));
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testSortingPreserved2() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0->3;2->7;5->1"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               LocalProperties lProps = LocalProperties.forOrdering(o);
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldList gFields = filtered.getGroupedFields();
+               Ordering order = filtered.getOrdering();
+
+               assertNotNull(gFields);
+               assertEquals(3, gFields.size());
+               assertTrue(gFields.contains(3));
+               assertTrue(gFields.contains(7));
+               assertTrue(gFields.contains(1));
+               assertNotNull(order);
+               assertEquals(3, order.getNumberOfFields());
+               assertEquals(7, order.getFieldNumber(0).intValue());
+               assertEquals(3, order.getFieldNumber(1).intValue());
+               assertEquals(1, order.getFieldNumber(2).intValue());
+               assertEquals(Order.ASCENDING, order.getOrder(0));
+               assertEquals(Order.DESCENDING, order.getOrder(1));
+               assertEquals(Order.DESCENDING, order.getOrder(2));
+               assertEquals(IntValue.class, order.getType(0));
+               assertEquals(StringValue.class, order.getType(1));
+               assertEquals(LongValue.class, order.getType(2));
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testSortingPreserved3() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               LocalProperties lProps = LocalProperties.forOrdering(o);
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldList gFields = filtered.getGroupedFields();
+               Ordering order = filtered.getOrdering();
+
+               assertNotNull(gFields);
+               assertEquals(2, gFields.size());
+               assertTrue(gFields.contains(0));
+               assertTrue(gFields.contains(2));
+               assertNotNull(order);
+               assertEquals(2, order.getNumberOfFields());
+               assertEquals(2, order.getFieldNumber(0).intValue());
+               assertEquals(0, order.getFieldNumber(1).intValue());
+               assertEquals(Order.ASCENDING, order.getOrder(0));
+               assertEquals(Order.DESCENDING, order.getOrder(1));
+               assertEquals(IntValue.class, order.getType(0));
+               assertEquals(StringValue.class, order.getType(1));
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testSortingPreserved4() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"2->7;5"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               LocalProperties lProps = LocalProperties.forOrdering(o);
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldList gFields = filtered.getGroupedFields();
+               Ordering order = filtered.getOrdering();
+
+               assertNotNull(gFields);
+               assertEquals(1, gFields.size());
+               assertTrue(gFields.contains(7));
+               assertNotNull(order);
+               assertEquals(1, order.getNumberOfFields());
+               assertEquals(7, order.getFieldNumber(0).intValue());
+               assertEquals(Order.ASCENDING, order.getOrder(0));
+               assertEquals(IntValue.class, order.getType(0));
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testSortingErased() {
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;5"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+               o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+               LocalProperties lProps = LocalProperties.forOrdering(o);
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldList gFields = filtered.getGroupedFields();
+               Ordering order = filtered.getOrdering();
+
+               assertNull(gFields);
+               assertNull(order);
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test
+       public void testUniqueFieldsPreserved1() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = new LocalProperties();
+               lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+               lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldSet expected1 = new FieldSet(0,1,2);
+               FieldSet expected2 = new FieldSet(3,4);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNotNull(filtered.getUniqueFields());
+               assertEquals(2, filtered.getUniqueFields().size());
+               assertTrue(filtered.getUniqueFields().contains(expected1));
+               assertTrue(filtered.getUniqueFields().contains(expected2));
+       }
+
+       @Test
+       public void testUniqueFieldsPreserved2() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = LocalProperties.forGrouping(new 
FieldList(1,2));
+               lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+               lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldSet expected1 = new FieldSet(0,1,2);
+               FieldSet expected2 = new FieldSet(3,4);
+
+               assertNull(filtered.getOrdering());
+               assertNotNull(filtered.getGroupedFields());
+               assertEquals(2, filtered.getGroupedFields().size());
+               assertTrue(filtered.getGroupedFields().contains(1));
+               assertTrue(filtered.getGroupedFields().contains(2));
+               assertNotNull(filtered.getUniqueFields());
+               assertEquals(2, filtered.getUniqueFields().size());
+               assertTrue(filtered.getUniqueFields().contains(expected1));
+               assertTrue(filtered.getUniqueFields().contains(expected2));
+       }
+
+       @Test
+       public void testUniqueFieldsPreserved3() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0->7;1->6;2->5;3->4;4->3"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = new LocalProperties();
+               lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+               lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+               FieldSet expected1 = new FieldSet(5,6,7);
+               FieldSet expected2 = new FieldSet(3,4);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNotNull(filtered.getUniqueFields());
+               assertEquals(2, filtered.getUniqueFields().size());
+               assertTrue(filtered.getUniqueFields().contains(expected1));
+               assertTrue(filtered.getUniqueFields().contains(expected2));
+       }
+
+       @Test
+       public void testUniqueFieldsErased() {
+
+               SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sp, new 
String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lProps = new LocalProperties();
+               lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+               lProps = lProps.addUniqueFields(new FieldSet(3,4));
+               lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+               LocalProperties filtered = 
lProps.filterBySemanticProperties(sp, 0);
+
+               assertNull(filtered.getGroupedFields());
+               assertNull(filtered.getOrdering());
+               assertNull(filtered.getUniqueFields());
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testInvalidInputIndex() {
+
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+               LocalProperties lprops = LocalProperties.forGrouping(new 
FieldList(0,1));
+
+               lprops.filterBySemanticProperties(sprops, 1);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
new file mode 100644
index 0000000..74126f8
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Key;
+
+import java.io.IOException;
+
+@SuppressWarnings("serial")
+public class MockDistribution implements DataDistribution {
+
+       @Override
+       public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+               return new Key<?>[0];
+       }
+
+       @Override
+       public int getNumberOfFields() {
+               return 0;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
new file mode 100644
index 0000000..2b2ab14
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+class MockPartitioner implements Partitioner<Tuple2<Long, Integer>> {
+       
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public int partition(Tuple2<Long, Integer> key, int numPartitions) {
+               return 0;
+       }
+}
\ No newline at end of file

Reply via email to