Repository: flink
Updated Branches:
  refs/heads/master 0df5601ad -> b0a57c32f


[FLINK-1628] [optimizer] Fix partitioning properties for Joins and CoGroups.

This closes #458


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a57c32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a57c32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a57c32

Branch: refs/heads/master
Commit: b0a57c32fc68d4a1369e5ece25d3a6e986ee1e2a
Parents: 0df5601
Author: Fabian Hueske <fhue...@apache.org>
Authored: Wed Mar 4 18:49:22 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Mar 10 01:43:41 2015 +0100

----------------------------------------------------------------------
 .../dataproperties/GlobalProperties.java        |   8 +
 .../dataproperties/PartitioningProperty.java    |   2 +-
 .../RequestedGlobalProperties.java              |  59 +-
 .../operators/AbstractJoinDescriptor.java       |  49 +-
 .../compiler/operators/CoGroupDescriptor.java   |  72 +-
 .../operators/OperatorDescriptorDual.java       |  44 +-
 .../operators/SortMergeJoinDescriptor.java      |  13 +-
 .../compiler/FeedbackPropertiesMatchTest.java   |   8 +-
 .../flink/compiler/PartitioningReusageTest.java | 859 +++++++++++++++++++
 .../GlobalPropertiesMatchingTest.java           | 152 +++-
 .../RequestedGlobalPropertiesFilteringTest.java |   2 +-
 .../api/common/operators/util/FieldList.java    |  15 +-
 12 files changed, 1224 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
index ca7e64d..31e13ae 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
@@ -183,6 +183,14 @@ public class GlobalProperties implements Cloneable {
                        return false;
                }
        }
+
+       public boolean isExactlyPartitionedOnFields(FieldList fields) {
+               if (this.partitioning.isPartitionedOnKey() && 
fields.isExactMatch(this.partitioningFields)) {
+                       return true;
+               } else {
+                       return false;
+               }
+       }
        
        public boolean matchesOrderedPartitioning(Ordering o) {
                if (this.partitioning == 
PartitioningProperty.RANGE_PARTITIONED) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
index 2b66ea0..45e323e 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
@@ -71,7 +71,7 @@ public enum PartitioningProperty {
         *         false otherwise.
         */
        public boolean isPartitioned() {
-               return this != FULL_REPLICATION && this != FORCED_REBALANCED;
+               return this != FULL_REPLICATION && this != FORCED_REBALANCED && 
this != ANY_DISTRIBUTION;
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
index daaa7dc..f4334ff 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
@@ -46,7 +47,7 @@ public final class RequestedGlobalProperties implements 
Cloneable {
        private DataDistribution dataDistribution;      // optional data 
distribution, for a range partitioning
        
        private Partitioner<?> customPartitioner;       // optional, 
partitioner for custom partitioning
-       
+
        // 
--------------------------------------------------------------------------------------------
        
        /**
@@ -60,7 +61,9 @@ public final class RequestedGlobalProperties implements 
Cloneable {
        
        /**
         * Sets the partitioning property for the global properties.
-        * 
+        * If the partitionFields are provided as {@link FieldSet} also subsets 
are valid,
+        * if provided as {@link FieldList} partitioning fields must exactly 
match incl. order.
+        *
         * @param partitionedFields
         */
        public void setHashPartitioned(FieldSet partitionedFields) {
@@ -86,7 +89,14 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                this.partitioningFields = null;
                this.dataDistribution = dataDistribution;
        }
-       
+
+       /**
+        * Sets the partitioning property for the global properties.
+        * If the partitionFields are provided as {@link FieldSet} also subsets 
are valid,
+        * if provided as {@link FieldList} partitioning fields must exactly 
match incl. order.
+        *
+        * @param partitionedFields
+        */
        public void setAnyPartitioning(FieldSet partitionedFields) {
                if (partitionedFields == null) {
                        throw new NullPointerException();
@@ -119,7 +129,14 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                this.partitioningFields = null;
                this.ordering = null;
        }
-       
+
+       /**
+        * Sets the partitioning property for the global properties.
+        * If the partitionFields are provided as {@link FieldSet} also subsets 
are valid,
+        * if provided as {@link FieldList} partitioning fields must exactly 
match incl. order.
+        *
+        * @param partitionedFields
+        */
        public void setCustomPartitioned(FieldSet partitionedFields, 
Partitioner<?> partitioner) {
                if (partitionedFields == null || partitioner == null) {
                        throw new NullPointerException();
@@ -130,7 +147,7 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                this.ordering = null;
                this.customPartitioner = partitioner;
        }
-       
+
        /**
         * Gets the partitioning property.
         * 
@@ -148,7 +165,7 @@ public final class RequestedGlobalProperties implements 
Cloneable {
        public FieldSet getPartitionedFields() {
                return this.partitioningFields;
        }
-       
+
        /**
         * Gets the key order.
         * 
@@ -220,7 +237,13 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                                return null;
                        case HASH_PARTITIONED:
                        case ANY_PARTITIONING:
-                               FieldSet newFields = new FieldSet();
+                               FieldSet newFields;
+                               if(this.partitioningFields instanceof 
FieldList) {
+                                       newFields = new FieldList();
+                               } else {
+                                       newFields = new FieldSet();
+                               }
+
                                for (Integer targetField : 
this.partitioningFields) {
                                        int sourceField = 
props.getForwardingSourceField(input, targetField);
                                        if (sourceField >= 0) {
@@ -274,11 +297,11 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                        return true;
                }
                else if (this.partitioning == 
PartitioningProperty.ANY_PARTITIONING) {
-                       return 
props.isPartitionedOnFields(this.partitioningFields);
+                       return checkCompatiblePartitioningFields(props);
                }
                else if (this.partitioning == 
PartitioningProperty.HASH_PARTITIONED) {
                        return props.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
-                                       
props.isPartitionedOnFields(this.partitioningFields);
+                                       
checkCompatiblePartitioningFields(props);
                }
                else if (this.partitioning == 
PartitioningProperty.RANGE_PARTITIONED) {
                        return props.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED &&
@@ -289,14 +312,15 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                }
                else if (this.partitioning == 
PartitioningProperty.CUSTOM_PARTITIONING) {
                        return props.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING &&
-                                       
props.isPartitionedOnFields(this.partitioningFields) &&
+                                       
checkCompatiblePartitioningFields(props) &&
                                        
props.getCustomPartitioner().equals(this.customPartitioner);
+
                }
                else {
                        throw new CompilerException("Properties matching logic 
leaves open cases.");
                }
        }
-       
+
        /**
         * Parameterizes the ship strategy fields of a channel such that the 
channel produces the desired global properties.
         * 
@@ -307,8 +331,8 @@ public final class RequestedGlobalProperties implements 
Cloneable {
 
                // safety check. Fully replicated input must be preserved.
                
if(channel.getSource().getGlobalProperties().isFullyReplicated() &&
-                               (       this.partitioning != 
PartitioningProperty.FULL_REPLICATION ||
-                                       this.partitioning != 
PartitioningProperty.ANY_DISTRIBUTION)) {
+                               !(this.partitioning == 
PartitioningProperty.FULL_REPLICATION ||
+                                       this.partitioning == 
PartitioningProperty.ANY_DISTRIBUTION)) {
                        throw new CompilerException("Fully replicated input 
must be preserved and may not be converted into another global property.");
                }
 
@@ -397,4 +421,13 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                        throw new RuntimeException(cnse);
                }
        }
+
+       private boolean checkCompatiblePartitioningFields(GlobalProperties 
props) {
+               if(this.partitioningFields instanceof FieldList) {
+                       // partitioningFields as FieldList requires strict 
checking!
+                       return 
props.isExactlyPartitionedOnFields((FieldList)this.partitioningFields);
+               } else {
+                       return 
props.isPartitionedOnFields(this.partitioningFields);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
index b1c3079..21dd3f4 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
@@ -64,15 +64,15 @@ public abstract class AbstractJoinDescriptor extends 
OperatorDescriptorDual {
                if (repartitionAllowed) {
                        // partition both (hash or custom)
                        if (this.customPartitioner == null) {
-                               
+
                                // we accept compatible partitionings of any 
type
                                RequestedGlobalProperties partitioned_left_any 
= new RequestedGlobalProperties();
                                RequestedGlobalProperties partitioned_right_any 
= new RequestedGlobalProperties();
                                
partitioned_left_any.setAnyPartitioning(this.keys1);
                                
partitioned_right_any.setAnyPartitioning(this.keys2);
                                pairs.add(new 
GlobalPropertiesPair(partitioned_left_any, partitioned_right_any));
-                               
-                               // we also explicitly add hash partitioning, as 
a fallback, if the any-pairs do not match
+
+                               // add strict hash partitioning of both inputs 
on their full key sets
                                RequestedGlobalProperties partitioned_left_hash 
= new RequestedGlobalProperties();
                                RequestedGlobalProperties 
partitioned_right_hash = new RequestedGlobalProperties();
                                
partitioned_left_hash.setHashPartitioned(this.keys1);
@@ -82,10 +82,10 @@ public abstract class AbstractJoinDescriptor extends 
OperatorDescriptorDual {
                        else {
                                RequestedGlobalProperties partitioned_left = 
new RequestedGlobalProperties();
                                
partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner);
-                               
+
                                RequestedGlobalProperties partitioned_right = 
new RequestedGlobalProperties();
                                
partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner);
-                               
+
                                return Collections.singletonList(new 
GlobalPropertiesPair(partitioned_left, partitioned_right));
                        }
                        
@@ -130,10 +130,40 @@ public abstract class AbstractJoinDescriptor extends 
OperatorDescriptorDual {
                        GlobalProperties produced1, GlobalProperties produced2)
        {
                if (requested1.getPartitioning().isPartitionedOnKey() && 
requested2.getPartitioning().isPartitionedOnKey()) {
-                       return produced1.getPartitioning() == 
produced2.getPartitioning() && 
-                                       (produced1.getCustomPartitioner() == 
null ? 
-                                               
produced2.getCustomPartitioner() == null :
-                                               
produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()));
+
+                       if(produced1.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
+                                       produced2.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED) {
+
+                               // both are hash partitioned, check that 
partitioning fields are equivalently chosen
+                               return checkEquivalentFieldPositionsInKeyFields(
+                                               
produced1.getPartitioningFields(), produced2.getPartitioningFields());
+
+                       }
+                       else if(produced1.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED &&
+                                       produced2.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED) {
+
+                               // both are range partitioned, check that 
partitioning fields are equivalently chosen
+                               return checkEquivalentFieldPositionsInKeyFields(
+                                               
produced1.getPartitioningFields(), produced2.getPartitioningFields());
+
+                       }
+                       else if(produced1.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING &&
+                                       produced2.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING) {
+
+                               // both use a custom partitioner. Check that 
both keys are exactly as specified and that both the same partitioner
+                               return 
produced1.getPartitioningFields().isExactMatch(this.keys1) &&
+                                               
produced2.getPartitioningFields().isExactMatch(this.keys2) &&
+                                               
produced1.getCustomPartitioner() != null && produced2.getCustomPartitioner() != 
null &&
+                                               
produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner());
+
+                       }
+                       else {
+
+                               // no other partitioning valid, incl. 
ANY_PARTITIONING.
+                               //   For joins we must ensure that both sides 
are exactly identically partitioned, ANY is not good enough.
+                               return false;
+                       }
+
                } else {
                        return true;
                }
@@ -151,4 +181,5 @@ public abstract class AbstractJoinDescriptor extends 
OperatorDescriptorDual {
                gp.clearUniqueFieldCombinations();
                return gp;
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
index ff4ca6e..f8d99f8 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
@@ -99,27 +99,31 @@ public class CoGroupDescriptor extends 
OperatorDescriptorDual {
 
        @Override
        protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
+
                if (this.customPartitioner == null) {
+
+                       // we accept compatible partitionings of any type
                        RequestedGlobalProperties partitioned_left_any = new 
RequestedGlobalProperties();
-                       RequestedGlobalProperties partitioned_left_hash = new 
RequestedGlobalProperties();
-                       partitioned_left_any.setAnyPartitioning(this.keys1);
-                       partitioned_left_hash.setHashPartitioned(this.keys1);
-                       
                        RequestedGlobalProperties partitioned_right_any = new 
RequestedGlobalProperties();
-                       RequestedGlobalProperties partitioned_right_hash = new 
RequestedGlobalProperties();
+                       partitioned_left_any.setAnyPartitioning(this.keys1);
                        partitioned_right_any.setAnyPartitioning(this.keys2);
+
+                       // add strict hash partitioning of both inputs on their 
full key sets
+                       RequestedGlobalProperties partitioned_left_hash = new 
RequestedGlobalProperties();
+                       RequestedGlobalProperties partitioned_right_hash = new 
RequestedGlobalProperties();
+                       partitioned_left_hash.setHashPartitioned(this.keys1);
                        partitioned_right_hash.setHashPartitioned(this.keys2);
-                       
+
                        return Arrays.asList(new 
GlobalPropertiesPair(partitioned_left_any, partitioned_right_any),
                                        new 
GlobalPropertiesPair(partitioned_left_hash, partitioned_right_hash));
                }
                else {
                        RequestedGlobalProperties partitioned_left = new 
RequestedGlobalProperties();
                        partitioned_left.setCustomPartitioned(this.keys1, 
this.customPartitioner);
-                       
+
                        RequestedGlobalProperties partitioned_right = new 
RequestedGlobalProperties();
                        partitioned_right.setCustomPartitioned(this.keys2, 
this.customPartitioner);
-                       
+
                        return Collections.singletonList(new 
GlobalPropertiesPair(partitioned_left, partitioned_right));
                }
        }
@@ -135,10 +139,40 @@ public class CoGroupDescriptor extends 
OperatorDescriptorDual {
        public boolean areCompatible(RequestedGlobalProperties requested1, 
RequestedGlobalProperties requested2,
                        GlobalProperties produced1, GlobalProperties produced2)
        {
-               return produced1.getPartitioning() == 
produced2.getPartitioning() && 
-                               (produced1.getCustomPartitioner() == null ? 
-                                       produced2.getCustomPartitioner() == 
null :
-                                       
produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()));
+
+               if(produced1.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
+                               produced2.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED) {
+
+                       // both are hash partitioned, check that partitioning 
fields are equivalently chosen
+                       return checkEquivalentFieldPositionsInKeyFields(
+                                       produced1.getPartitioningFields(), 
produced2.getPartitioningFields());
+
+               }
+               else if(produced1.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED &&
+                               produced2.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED) {
+
+                       // both are range partitioned, check that partitioning 
fields are equivalently chosen
+                       return checkEquivalentFieldPositionsInKeyFields(
+                                       produced1.getPartitioningFields(), 
produced2.getPartitioningFields());
+
+               }
+               else if(produced1.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING &&
+                               produced2.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING) {
+
+                       // both use a custom partitioner. Check that both keys 
are exactly as specified and that both the same partitioner
+                       return 
produced1.getPartitioningFields().isExactMatch(this.keys1) &&
+                                       
produced2.getPartitioningFields().isExactMatch(this.keys2) &&
+                                       produced1.getCustomPartitioner() != 
null && produced2.getCustomPartitioner() != null &&
+                                       
produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner());
+
+               }
+               else {
+
+                       // no other partitioning valid, incl. ANY_PARTITIONING.
+                       //   For co-groups we must ensure that both sides are 
exactly identically partitioned, ANY is not good enough.
+                       return false;
+               }
+
        }
        
        @Override
@@ -150,12 +184,17 @@ public class CoGroupDescriptor extends 
OperatorDescriptorDual {
                Ordering prod1 = produced1.getOrdering();
                Ordering prod2 = produced2.getOrdering();
                
-               if (prod1 == null || prod2 == null || prod1.getNumberOfFields() 
< numRelevantFields ||
-                               prod2.getNumberOfFields() < 
prod2.getNumberOfFields())
-               {
+               if (prod1 == null || prod2 == null) {
                        throw new CompilerException("The given properties do 
not meet this operators requirements.");
                }
-                       
+
+               // check that order of fields is equivalent
+               if (!checkEquivalentFieldPositionsInKeyFields(
+                               prod1.getInvolvedIndexes(), 
prod2.getInvolvedIndexes(), numRelevantFields)) {
+                       return false;
+               }
+
+               // check that order directions are equivalent
                for (int i = 0; i < numRelevantFields; i++) {
                        if (prod1.getOrder(i) != prod2.getOrder(i)) {
                                return false;
@@ -196,4 +235,5 @@ public class CoGroupDescriptor extends 
OperatorDescriptorDual {
                LocalProperties comb = LocalProperties.combine(in1, in2);
                return comb.clearUniqueFieldSets();
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
index 8eca16e..f72e6b5 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
@@ -22,6 +22,7 @@ package org.apache.flink.compiler.operators;
 import java.util.List;
 
 import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.dag.TwoInputNode;
 import org.apache.flink.compiler.dataproperties.GlobalProperties;
 import org.apache.flink.compiler.dataproperties.LocalProperties;
@@ -81,7 +82,48 @@ public abstract class OperatorDescriptorDual implements 
AbstractOperatorDescript
        public abstract GlobalProperties 
computeGlobalProperties(GlobalProperties in1, GlobalProperties in2);
        
        public abstract LocalProperties computeLocalProperties(LocalProperties 
in1, LocalProperties in2);
-       
+
+       protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList 
fields1, FieldList fields2) {
+
+               // check number of produced partitioning fields
+               if(fields1.size() != fields2.size()) {
+                       return false;
+               } else {
+                       return 
checkEquivalentFieldPositionsInKeyFields(fields1, fields2, fields1.size());
+               }
+       }
+
+       protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList 
fields1, FieldList fields2, int numRelevantFields) {
+
+               // check number of produced partitioning fields
+               if(fields1.size() < numRelevantFields || fields2.size() < 
numRelevantFields) {
+                       return false;
+               }
+               else {
+                       for(int i=0; i<numRelevantFields; i++) {
+                               int pField1 = fields1.get(i);
+                               int pField2 = fields2.get(i);
+                               // check if position of both produced fields is 
the same in both requested fields
+                               int j;
+                               for(j=0; j<this.keys1.size(); j++) {
+                                       if(this.keys1.get(j) == pField1 && 
this.keys2.get(j) == pField2) {
+                                               break;
+                                       }
+                                       else if(this.keys1.get(j) != pField1 && 
this.keys2.get(j) != pField2) {
+                                               // do nothing
+                                       }
+                                       else {
+                                               return false;
+                                       }
+                               }
+                               if(j == this.keys1.size()) {
+                                       throw new CompilerException("Fields 
were not found in key fields.");
+                               }
+                       }
+               }
+               return true;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        
        public static final class GlobalPropertiesPair {

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
index cd6094e..4ca82d5 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
@@ -68,12 +68,17 @@ public class SortMergeJoinDescriptor extends 
AbstractJoinDescriptor {
                Ordering prod1 = produced1.getOrdering();
                Ordering prod2 = produced2.getOrdering();
                
-               if (prod1 == null || prod2 == null || prod1.getNumberOfFields() 
< numRelevantFields ||
-                               prod2.getNumberOfFields() < 
prod2.getNumberOfFields())
-               {
+               if (prod1 == null || prod2 == null) {
                        throw new CompilerException("The given properties do 
not meet this operators requirements.");
                }
-                       
+
+               // check that order of fields is equivalent
+               if (!checkEquivalentFieldPositionsInKeyFields(
+                               prod1.getInvolvedIndexes(), 
prod2.getInvolvedIndexes(), numRelevantFields)) {
+                       return false;
+               }
+
+               // check that both inputs have the same directions of order
                for (int i = 0; i < numRelevantFields; i++) {
                        if (prod1.getOrder(i) != prod2.getOrder(i)) {
                                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
index e3f5267..677d9be 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
@@ -211,7 +211,7 @@ public class FeedbackPropertiesMatchTest {
                                LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1, 2));
                                
                                RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldList(2, 5));
+                               reqGp.setHashPartitioned(new FieldSet(2, 5));
                                
                                RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
                                reqLp.setGroupedFields(new FieldList(1));
@@ -375,7 +375,7 @@ public class FeedbackPropertiesMatchTest {
                                LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1, 2));
                                
                                RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldList(2, 5));
+                               reqGp.setHashPartitioned(new FieldSet(2, 5));
                                
                                RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
                                reqLp.setGroupedFields(new FieldList(1));
@@ -397,7 +397,7 @@ public class FeedbackPropertiesMatchTest {
                                LocalProperties lp = new LocalProperties();
                                
                                RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setHashPartitioned(new FieldList(2, 5));
+                               reqGp.setHashPartitioned(new FieldSet(2, 5));
                                
                                toMap1.setRequiredGlobalProps(null);
                                toMap1.setRequiredLocalProps(null);
@@ -434,7 +434,7 @@ public class FeedbackPropertiesMatchTest {
                                LocalProperties lp = 
LocalProperties.forGrouping(new FieldList(1));
                                
                                RequestedGlobalProperties reqGp = new 
RequestedGlobalProperties();
-                               reqGp.setAnyPartitioning(new FieldList(2, 5));
+                               reqGp.setAnyPartitioning(new FieldSet(2, 5));
                                
                                RequestedLocalProperties reqLp = new 
RequestedLocalProperties();
                                reqLp.setGroupedFields(new FieldList(1));

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java
new file mode 100644
index 0000000..c1ebd7b
--- /dev/null
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java
@@ -0,0 +1,859 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.dag.JoinNode;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.PlanNode;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class PartitioningReusageTest extends CompilerTestBase {
+
+       @Test
+       public void noPreviousPartitioningJoin1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                                       .where(0,1).equalTo(0,1).with(new 
MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+
+       }
+
+       @Test
+       public void noPreviousPartitioningJoin2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+
+       }
+
+       @Test
+       public void reuseSinglePartitioningJoin1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(0,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+
+       }
+
+       @Test
+       public void reuseSinglePartitioningJoin2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseSinglePartitioningJoin3() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .join(set2.partitionByHash(2, 1)
+                                                       .map(new MockMapper())
+                                                       
.withForwardedFields("2;1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseSinglePartitioningJoin4() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0)
+                               .map(new MockMapper()).withForwardedFields("0")
+                               .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseSinglePartitioningJoin5() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .join(set2.partitionByHash(2)
+                                                       .map(new MockMapper())
+                                                       
.withForwardedFields("2"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .join(set2.partitionByHash(0,1)
+                                                       .map(new MockMapper())
+                                                       
.withForwardedFields("0;1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(0,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+
+       @Test
+       public void reuseBothPartitioningJoin2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .join(set2.partitionByHash(1,2)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("1;2"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin3() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0)
+                               .map(new MockMapper()).withForwardedFields("0")
+                               .join(set2.partitionByHash(2,1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("2;1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin4() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0,2)
+                               .map(new 
MockMapper()).withForwardedFields("0;2")
+                               .join(set2.partitionByHash(1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,2).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin5() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(2)
+                               .map(new MockMapper()).withForwardedFields("2")
+                               .join(set2.partitionByHash(1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,2).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin6() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0)
+                               .map(new MockMapper()).withForwardedFields("0")
+                               .join(set2.partitionByHash(1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,2).equalTo(1,2).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin7() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(2)
+                               .map(new MockMapper()).withForwardedFields("2")
+                               .join(set2.partitionByHash(1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,2).equalTo(1,2).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+
+       @Test
+       public void noPreviousPartitioningCoGroup1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .coGroup(set2)
+                               .where(0,1).equalTo(0,1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+
+       }
+
+       @Test
+       public void noPreviousPartitioningCoGroup2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .coGroup(set2)
+                               .where(0,1).equalTo(2,1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+
+       }
+
+       @Test
+       public void reuseSinglePartitioningCoGroup1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .coGroup(set2)
+                               .where(0,1).equalTo(0,1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+
+       }
+
+       @Test
+       public void reuseSinglePartitioningCoGroup2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .coGroup(set2)
+                               .where(0,1).equalTo(2,1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseSinglePartitioningCoGroup3() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .coGroup(set2.partitionByHash(2, 1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("2;1"))
+                               .where(0,1).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseSinglePartitioningCoGroup4() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0)
+                               .map(new MockMapper()).withForwardedFields("0")
+                               .coGroup(set2)
+                               .where(0, 1).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseSinglePartitioningCoGroup5() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .coGroup(set2.partitionByHash(2)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("2"))
+                               .where(0,1).equalTo(2,1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .coGroup(set2.partitionByHash(0, 1)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("0;1"))
+                               .where(0, 1).equalTo(0, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+
+       @Test
+       public void reuseBothPartitioningCoGroup2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .coGroup(set2.partitionByHash(1, 2)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("1;2"))
+                               .where(0, 1).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup3() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0)
+                               .map(new MockMapper()).withForwardedFields("0")
+                               .coGroup(set2.partitionByHash(2, 1)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("2;1"))
+                               .where(0, 1).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup4() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0,2)
+                               .map(new 
MockMapper()).withForwardedFields("0;2")
+                               .coGroup(set2.partitionByHash(1)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("1"))
+                               .where(0, 2).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup5() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(2)
+                               .map(new MockMapper()).withForwardedFields("2")
+                               .coGroup(set2.partitionByHash(1)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("1"))
+                               .where(0, 2).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup6() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(2)
+                               .map(new MockMapper()).withForwardedFields("2")
+                               .coGroup(set2.partitionByHash(2)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("2"))
+                               .where(0, 2).equalTo(1, 2).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup7() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(2)
+                               .map(new MockMapper()).withForwardedFields("2")
+                               .coGroup(set2.partitionByHash(1)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("1"))
+                               .where(0, 2).equalTo(1, 2).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+
+
+       private void checkValidJoinInputProperties(DualInputPlanNode join) {
+
+               GlobalProperties inProps1 = 
join.getInput1().getGlobalProperties();
+               GlobalProperties inProps2 = 
join.getInput2().getGlobalProperties();
+
+               if(inProps1.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
+                               inProps2.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED) {
+
+                       // check that both inputs are hash partitioned on the 
same fields
+                       FieldList pFields1 = inProps1.getPartitioningFields();
+                       FieldList pFields2 = inProps2.getPartitioningFields();
+
+                       assertTrue("Inputs are not the same number of fields. 
Input 1: "+pFields1+", Input 2: "+pFields2,
+                                       pFields1.size() == pFields2.size());
+
+                       FieldList reqPFields1 = join.getKeysForInput1();
+                       FieldList reqPFields2 = join.getKeysForInput2();
+
+                       for(int i=0; i<pFields1.size(); i++) {
+
+                               // get fields
+                               int f1 = pFields1.get(i);
+                               int f2 = pFields2.get(i);
+
+                               // check that field positions in original key 
field list are identical
+                               int pos1 = getPosInFieldList(f1, reqPFields1);
+                               int pos2 = getPosInFieldList(f2, reqPFields2);
+
+                               if(pos1 < 0) {
+                                       fail("Input 1 is partitioned on field 
"+f1+" which is not contained in the key set "+reqPFields1);
+                               }
+                               if(pos2 < 0) {
+                                       fail("Input 2 is partitioned on field 
"+f2+" which is not contained in the key set "+reqPFields2);
+                               }
+                               if(pos1 != pos2) {
+                                       fail("Inputs are not partitioned on the 
same key fields");
+                               }
+                       }
+
+               }
+               else if(inProps1.getPartitioning() == 
PartitioningProperty.FULL_REPLICATION &&
+                               inProps2.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED) {
+                       // we are good. No need to check for fields
+               }
+               else if(inProps1.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED &&
+                               inProps2.getPartitioning() == 
PartitioningProperty.FULL_REPLICATION) {
+                       // we are good. No need to check for fields
+               }
+               else {
+                       throw new UnsupportedOperationException("This method 
has only been implemented to check for hash partitioned coGroupinputs");
+               }
+
+       }
+
+       private void checkValidCoGroupInputProperties(DualInputPlanNode 
coGroup) {
+
+               GlobalProperties inProps1 = 
coGroup.getInput1().getGlobalProperties();
+               GlobalProperties inProps2 = 
coGroup.getInput2().getGlobalProperties();
+
+               if(inProps1.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
+                               inProps2.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED) {
+
+                       // check that both inputs are hash partitioned on the 
same fields
+                       FieldList pFields1 = inProps1.getPartitioningFields();
+                       FieldList pFields2 = inProps2.getPartitioningFields();
+
+                       assertTrue("Inputs are not the same number of fields. 
Input 1: "+pFields1+", Input 2: "+pFields2,
+                                       pFields1.size() == pFields2.size());
+
+                       FieldList reqPFields1 = coGroup.getKeysForInput1();
+                       FieldList reqPFields2 = coGroup.getKeysForInput2();
+
+                       for(int i=0; i<pFields1.size(); i++) {
+
+                               // get fields
+                               int f1 = pFields1.get(i);
+                               int f2 = pFields2.get(i);
+
+                               // check that field positions in original key 
field list are identical
+                               int pos1 = getPosInFieldList(f1, reqPFields1);
+                               int pos2 = getPosInFieldList(f2, reqPFields2);
+
+                               if(pos1 < 0) {
+                                       fail("Input 1 is partitioned on field 
"+f1+" which is not contained in the key set "+reqPFields1);
+                               }
+                               if(pos2 < 0) {
+                                       fail("Input 2 is partitioned on field 
"+f2+" which is not contained in the key set "+reqPFields2);
+                               }
+                               if(pos1 != pos2) {
+                                       fail("Inputs are not partitioned on the 
same key fields");
+                               }
+                       }
+
+               }
+               else {
+                       throw new UnsupportedOperationException("This method 
has only been implemented to check for hash partitioned coGroup inputs");
+               }
+
+       }
+
+       private int getPosInFieldList(int field, FieldList list) {
+
+               int pos;
+               for(pos=0; pos<list.size(); pos++) {
+                       if(field == list.get(pos)) {
+                               break;
+                       }
+               }
+               if(pos == list.size()) {
+                       return -1;
+               } else {
+                       return pos;
+               }
+
+       }
+
+
+
+       public static class MockMapper implements MapFunction<Tuple3<Integer, 
Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+               @Override
+               public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, 
Integer, Integer> value) throws Exception {
+                       return null;
+               }
+       }
+
+       public static class MockJoin implements JoinFunction<Tuple3<Integer, 
Integer, Integer>,
+                       Tuple3<Integer, Integer, Integer>, Tuple3<Integer, 
Integer, Integer>> {
+
+               @Override
+               public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, 
Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws 
Exception {
+                       return null;
+               }
+       }
+
+       public static class MockCoGroup implements 
CoGroupFunction<Tuple3<Integer, Integer, Integer>,
+                               Tuple3<Integer, Integer, Integer>, 
Tuple3<Integer, Integer, Integer>> {
+
+               @Override
+               public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> 
first, Iterable<Tuple3<Integer, Integer, Integer>> second,
+                                                       
Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
+
+               }
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
index 1890597..f6ae01a 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
@@ -42,30 +42,34 @@ public class GlobalPropertiesMatchingTest {
                                GlobalProperties gp1 = new GlobalProperties();
                                gp1.setAnyPartitioning(new FieldList(2, 6));
                                assertTrue(req.isMetBy(gp1));
-                               
+
                                GlobalProperties gp2 = new GlobalProperties();
                                gp2.setAnyPartitioning(new FieldList(6, 2));
                                assertTrue(req.isMetBy(gp2));
-                               
+
                                GlobalProperties gp3 = new GlobalProperties();
-                               gp3.setAnyPartitioning(new FieldList(6, 1));
+                               gp3.setAnyPartitioning(new FieldList(6, 2, 4));
                                assertFalse(req.isMetBy(gp3));
-                               
+
                                GlobalProperties gp4 = new GlobalProperties();
-                               gp4.setAnyPartitioning(new FieldList(2));
-                               assertTrue(req.isMetBy(gp4));
+                               gp4.setAnyPartitioning(new FieldList(6, 1));
+                               assertFalse(req.isMetBy(gp4));
+
+                               GlobalProperties gp5 = new GlobalProperties();
+                               gp5.setAnyPartitioning(new FieldList(2));
+                               assertTrue(req.isMetBy(gp5));
                        }
-                       
+
                        // match hash partitioning
                        {
                                GlobalProperties gp1 = new GlobalProperties();
                                gp1.setHashPartitioned(new FieldList(2, 6));
                                assertTrue(req.isMetBy(gp1));
-                               
+
                                GlobalProperties gp2 = new GlobalProperties();
                                gp2.setHashPartitioned(new FieldList(6, 2));
                                assertTrue(req.isMetBy(gp2));
-                               
+
                                GlobalProperties gp3 = new GlobalProperties();
                                gp3.setHashPartitioned(new FieldList(6, 1));
                                assertFalse(req.isMetBy(gp3));
@@ -154,6 +158,136 @@ public class GlobalPropertiesMatchingTest {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void testStrictlyMatchingAnyPartitioning() {
+
+               RequestedGlobalProperties req = new RequestedGlobalProperties();
+               req.setAnyPartitioning(new FieldList(6, 2));
+
+               // match any partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setAnyPartitioning(new FieldList(6, 2));
+                       assertTrue(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setAnyPartitioning(new FieldList(2, 6));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setAnyPartitioning(new FieldList(6, 2, 3));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp4 = new GlobalProperties();
+                       gp3.setAnyPartitioning(new FieldList(6, 1));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp5 = new GlobalProperties();
+                       gp4.setAnyPartitioning(new FieldList(2));
+                       assertFalse(req.isMetBy(gp4));
+               }
+
+               // match hash partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setHashPartitioned(new FieldList(6, 2));
+                       assertTrue(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setHashPartitioned(new FieldList(2, 6));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setHashPartitioned(new FieldList(6, 1));
+                       assertFalse(req.isMetBy(gp3));
+               }
+
+               // match range partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+                       assertTrue(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setRangePartitioned(new Ordering(2, null, 
Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp4 = new GlobalProperties();
+                       gp4.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING));
+                       assertFalse(req.isMetBy(gp4));
+               }
+
+       }
+
+       @Test
+       public void testStrictlyMatchingHashPartitioning() {
+
+               RequestedGlobalProperties req = new RequestedGlobalProperties();
+               req.setHashPartitioned(new FieldList(6, 2));
+
+               // match any partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setAnyPartitioning(new FieldList(6, 2));
+                       assertFalse(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setAnyPartitioning(new FieldList(2, 6));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setAnyPartitioning(new FieldList(6, 1));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp4 = new GlobalProperties();
+                       gp4.setAnyPartitioning(new FieldList(2));
+                       assertFalse(req.isMetBy(gp4));
+               }
+
+               // match hash partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setHashPartitioned(new FieldList(6, 2));
+                       assertTrue(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setHashPartitioned(new FieldList(2, 6));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setHashPartitioned(new FieldList(6, 1));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp4 = new GlobalProperties();
+                       gp4.setHashPartitioned(new FieldList(6, 2, 0));
+                       assertFalse(req.isMetBy(gp4));
+               }
+
+               // match range partitioning
+               {
+                       GlobalProperties gp1 = new GlobalProperties();
+                       gp1.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+                       assertFalse(req.isMetBy(gp1));
+
+                       GlobalProperties gp2 = new GlobalProperties();
+                       gp2.setRangePartitioned(new Ordering(2, null, 
Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+                       assertFalse(req.isMetBy(gp2));
+
+                       GlobalProperties gp3 = new GlobalProperties();
+                       gp3.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+                       assertFalse(req.isMetBy(gp3));
+
+                       GlobalProperties gp4 = new GlobalProperties();
+                       gp4.setRangePartitioned(new Ordering(6, null, 
Order.DESCENDING));
+                       assertFalse(req.isMetBy(gp4));
+               }
+
+       }
        
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
index 3f9c0db..0bb72b8 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
@@ -423,7 +423,7 @@ public class RequestedGlobalPropertiesFilteringTest {
                SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
 
                RequestedGlobalProperties gprops = new 
RequestedGlobalProperties();
-               gprops.setHashPartitioned(new FieldList(0,1));
+               gprops.setHashPartitioned(new FieldSet(0,1));
 
                gprops.filterBySemanticProperties(sprops, 1);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
index 1a76d49..ae0a722 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
@@ -117,7 +117,7 @@ public class FieldList extends FieldSet {
        public FieldList toFieldList() {
                return this;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        
        @Override
@@ -158,6 +158,19 @@ public class FieldList extends FieldSet {
                }
                return true;
        }
+
+       public boolean isExactMatch(FieldList list) {
+               if (this.size() != list.size()) {
+                       return false;
+               } else {
+                       for (int i = 0; i < this.size(); i++) {
+                               if (this.get(i) != list.get(i)) {
+                                       return false;
+                               }
+                       }
+                       return true;
+               }
+       }
        
        // 
--------------------------------------------------------------------------------------------
 

Reply via email to