[FLINK-1443] Added support for replicated data sources.
Introduced new PartitioningProperty for any distribution (random partitioning 
or full replication).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19b4a02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19b4a02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19b4a02

Branch: refs/heads/master
Commit: a19b4a02bfa5237e0dcd2b264da36229546f23c0
Parents: 7452802
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Feb 3 16:03:07 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Feb 5 11:18:04 2015 +0100

----------------------------------------------------------------------
 .../flink/compiler/dag/DataSourceNode.java      |  41 +-
 .../flink/compiler/dag/SingleInputNode.java     |  24 +-
 .../apache/flink/compiler/dag/TwoInputNode.java |  33 ++
 .../dataproperties/GlobalProperties.java        |  17 +-
 .../dataproperties/PartitioningProperty.java    |  11 +-
 .../RequestedGlobalProperties.java              |  35 +-
 .../operators/AbstractJoinDescriptor.java       |   2 +-
 .../operators/AllGroupReduceProperties.java     |   2 +-
 .../AllGroupWithPartialPreGroupProperties.java  |   2 +-
 .../operators/CartesianProductDescriptor.java   |   2 +-
 .../compiler/operators/CoGroupDescriptor.java   |   2 +-
 .../operators/CollectorMapDescriptor.java       |   6 +-
 .../compiler/operators/FilterDescriptor.java    |   4 +-
 .../compiler/operators/FlatMapDescriptor.java   |   6 +-
 .../operators/GroupReduceProperties.java        |   2 +-
 .../GroupReduceWithCombineProperties.java       |   2 +-
 .../flink/compiler/operators/MapDescriptor.java |   4 +-
 .../operators/MapPartitionDescriptor.java       |   4 +-
 .../operators/OperatorDescriptorSingle.java     |   8 +-
 .../operators/PartialGroupProperties.java       |   2 +-
 .../compiler/operators/ReduceProperties.java    |   2 +-
 .../apache/flink/compiler/DOPChangeTest.java    |  10 +-
 .../compiler/ReplicatingDataSourceTest.java     | 495 +++++++++++++++++++
 .../GlobalPropertiesFilteringTest.java          |  12 +-
 .../RequestedGlobalPropertiesFilteringTest.java |   2 +-
 .../api/common/io/ReplicatingInputFormat.java   | 115 +++++
 .../io/ReplicatingInputSplitAssigner.java       |  79 +++
 .../ReplicatingDataSourceITCase.java            | 141 ++++++
 28 files changed, 1006 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
index 10c77ca..af2a92b 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.ReplicatingInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
@@ -48,6 +49,8 @@ public class DataSourceNode extends OptimizerNode {
        
        private final boolean sequentialInput;
 
+       private final boolean replicatedInput;
+
        /**
         * Creates a new DataSourceNode for the given contract.
         * 
@@ -67,6 +70,12 @@ public class DataSourceNode extends OptimizerNode {
                } else {
                        this.sequentialInput = false;
                }
+
+               if (pactContract.getUserCodeWrapper().getUserCodeObject() 
instanceof ReplicatingInputFormat) {
+                       this.replicatedInput = true;
+               } else {
+                       this.replicatedInput = false;
+               }
        }
 
        /**
@@ -174,17 +183,31 @@ public class DataSourceNode extends OptimizerNode {
                if (this.cachedPlans != null) {
                        return this.cachedPlans;
                }
-               
+
                SourcePlanNode candidate = new SourcePlanNode(this, "DataSource 
("+this.getPactContract().getName()+")");
-               candidate.updatePropertiesWithUniqueSets(getUniqueFields());
-               
-               final Costs costs = new Costs();
-               if 
(FileInputFormat.class.isAssignableFrom(getPactContract().getFormatWrapper().getUserCodeClass())
 &&
-                               this.estimatedOutputSize >= 0)
-               {
-                       estimator.addFileInputCost(this.estimatedOutputSize, 
costs);
+
+               if(!replicatedInput) {
+                       
candidate.updatePropertiesWithUniqueSets(getUniqueFields());
+
+                       final Costs costs = new Costs();
+                       if 
(FileInputFormat.class.isAssignableFrom(getPactContract().getFormatWrapper().getUserCodeClass())
 &&
+                                       this.estimatedOutputSize >= 0) {
+                               
estimator.addFileInputCost(this.estimatedOutputSize, costs);
+                       }
+                       candidate.setCosts(costs);
+               } else {
+                       // replicated input
+                       final Costs costs = new Costs();
+                       InputFormat<?,?> inputFormat =
+                                       
((ReplicatingInputFormat<?,?>)getPactContract().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
+                       if 
(FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&
+                                       this.estimatedOutputSize >= 0) {
+                               
estimator.addFileInputCost(this.estimatedOutputSize * 
this.getDegreeOfParallelism(), costs);
+                       }
+                       candidate.setCosts(costs);
+
+                       candidate.getGlobalProperties().setFullyReplicated();
                }
-               candidate.setCosts(costs);
 
                // since there is only a single plan for the data-source, 
return a list with that element only
                List<PlanNode> plans = new ArrayList<PlanNode>(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
index e7e82fc..70c4291 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
@@ -240,6 +240,8 @@ public abstract class SingleInputNode extends OptimizerNode 
{
                        return this.cachedPlans;
                }
 
+               boolean childrenSkippedDueToReplicatedInput = false;
+
                // calculate alternative sub-plans for predecessor
                final List<? extends PlanNode> subPlans = 
getPredecessorNode().getAlternativePlans(estimator);
                final Set<RequestedGlobalProperties> intGlobal = 
this.inConn.getInterestingProperties().getGlobalProperties();
@@ -279,6 +281,18 @@ public abstract class SingleInputNode extends 
OptimizerNode {
 
                // create all candidates
                for (PlanNode child : subPlans) {
+
+                       if(child.getGlobalProperties().isFullyReplicated()) {
+                               // fully replicated input is always locally 
forwarded if DOP is not changed
+                               if(dopChange) {
+                                       // can not continue with this child
+                                       childrenSkippedDueToReplicatedInput = 
true;
+                                       continue;
+                               } else {
+                                       
this.inConn.setShipStrategy(ShipStrategyType.FORWARD);
+                               }
+                       }
+
                        if (this.inConn.getShipStrategy() == null) {
                                // pick the strategy ourselves
                                for (RequestedGlobalProperties igps: intGlobal) 
{
@@ -325,7 +339,15 @@ public abstract class SingleInputNode extends 
OptimizerNode {
                                }
                        }
                }
-               
+
+               if(outputPlans.isEmpty()) {
+                       if(childrenSkippedDueToReplicatedInput) {
+                               throw new CompilerException("No plan meeting 
the requirements could be created @ " + this + ". Most likely reason: Invalid 
use of replicated input.");
+                       } else {
+                               throw new CompilerException("No plan meeting 
the requirements could be created @ " + this + ". Most likely reason: Too 
restrictive plan hints.");
+                       }
+               }
+
                // cost and prune the plans
                for (PlanNode node : outputPlans) {
                        estimator.costOperator(node);

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
index 5e9a980..b391890 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
@@ -300,6 +300,8 @@ public abstract class TwoInputNode extends OptimizerNode {
                        return this.cachedPlans;
                }
 
+               boolean childrenSkippedDueToReplicatedInput = false;
+
                // step down to all producer nodes and calculate alternative 
plans
                final List<? extends PlanNode> subPlans1 = 
getFirstPredecessorNode().getAlternativePlans(estimator);
                final List<? extends PlanNode> subPlans2 = 
getSecondPredecessorNode().getAlternativePlans(estimator);
@@ -353,7 +355,30 @@ public abstract class TwoInputNode extends OptimizerNode {
                
                // create all candidates
                for (PlanNode child1 : subPlans1) {
+
+                       if(child1.getGlobalProperties().isFullyReplicated()) {
+                               // fully replicated input is always locally 
forwarded if DOP is not changed
+                               if(dopChange1) {
+                                       // can not continue with this child
+                                       childrenSkippedDueToReplicatedInput = 
true;
+                                       continue;
+                               } else {
+                                       
this.input1.setShipStrategy(ShipStrategyType.FORWARD);
+                               }
+                       }
+
                        for (PlanNode child2 : subPlans2) {
+
+                               
if(child2.getGlobalProperties().isFullyReplicated()) {
+                                       // fully replicated input is always 
locally forwarded if DOP is not changed
+                                       if(dopChange2) {
+                                               // can not continue with this 
child
+                                               
childrenSkippedDueToReplicatedInput = true;
+                                               continue;
+                                       } else {
+                                               
this.input2.setShipStrategy(ShipStrategyType.FORWARD);
+                                       }
+                               }
                                
                                // check that the children go together. that is 
the case if they build upon the same
                                // candidate at the joined branch plan. 
@@ -457,6 +482,14 @@ public abstract class TwoInputNode extends OptimizerNode {
                        }
                }
 
+               if(outputPlans.isEmpty()) {
+                       if(childrenSkippedDueToReplicatedInput) {
+                               throw new CompilerException("No plan meeting 
the requirements could be created @ " + this + ". Most likely reason: Invalid 
use of replicated input.");
+                       } else {
+                               throw new CompilerException("No plan meeting 
the requirements could be created @ " + this + ". Most likely reason: Too 
restrictive plan hints.");
+                       }
+               }
+
                // cost and prune the plans
                for (PlanNode node : outputPlans) {
                        estimator.costOperator(node);

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
index fb1f1a2..ca7e64d 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
@@ -61,7 +61,7 @@ public class GlobalProperties implements Cloneable {
         * Initializes the global properties with no partitioning.
         */
        public GlobalProperties() {
-               this.partitioning = PartitioningProperty.RANDOM;
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -102,8 +102,8 @@ public class GlobalProperties implements Cloneable {
                this.ordering = null;
        }
        
-       public void setRandomDistribution() {
-               this.partitioning = PartitioningProperty.RANDOM;
+       public void setRandomPartitioned() {
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
                this.partitioningFields = null;
                this.ordering = null;
        }
@@ -224,14 +224,14 @@ public class GlobalProperties implements Cloneable {
         * Checks, if the properties in this object are trivial, i.e. only 
standard values.
         */
        public boolean isTrivial() {
-               return partitioning == PartitioningProperty.RANDOM;
+               return partitioning == PartitioningProperty.RANDOM_PARTITIONED;
        }
 
        /**
         * This method resets the properties to a state where no properties are 
given.
         */
        public void reset() {
-               this.partitioning = PartitioningProperty.RANDOM;
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
                this.ordering = null;
                this.partitioningFields = null;
        }
@@ -254,8 +254,6 @@ public class GlobalProperties implements Cloneable {
 
                // filter partitioning
                switch(this.partitioning) {
-                       case FULL_REPLICATION:
-                               return gp;
                        case RANGE_PARTITIONED:
                                // check if ordering is preserved
                                Ordering newOrdering = new Ordering();
@@ -308,7 +306,8 @@ public class GlobalProperties implements Cloneable {
                                }
                                break;
                        case FORCED_REBALANCED:
-                       case RANDOM:
+                       case FULL_REPLICATION:
+                       case RANDOM_PARTITIONED:
                                gp.partitioning = this.partitioning;
                                break;
                        default:
@@ -350,7 +349,7 @@ public class GlobalProperties implements Cloneable {
 
        public void parameterizeChannel(Channel channel, boolean 
globalDopChange) {
                switch (this.partitioning) {
-                       case RANDOM:
+                       case RANDOM_PARTITIONED:
                                channel.setShipStrategy(globalDopChange ? 
ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD);
                                break;
                        case FULL_REPLICATION:

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
index 47cd6b8..2b66ea0 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
@@ -22,11 +22,16 @@ package org.apache.flink.compiler.dataproperties;
  * An enumeration tracking the different types of sharding strategies.
  */
 public enum PartitioningProperty {
-       
+
+       /**
+        * Any data distribution, i.e., random partitioning or full replication.
+        */
+       ANY_DISTRIBUTION,
+
        /**
         * Constant indicating no particular partitioning (i.e. random) data 
distribution.
         */
-       RANDOM,
+       RANDOM_PARTITIONED,
 
        /**
         * Constant indicating a hash partitioning.
@@ -85,7 +90,7 @@ public enum PartitioningProperty {
         * @return True, if the data is partitioned on a key.
         */
        public boolean isPartitionedOnKey() {
-               return isPartitioned() && this != RANDOM;
+               return isPartitioned() && this != RANDOM_PARTITIONED;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
index f304bf6..daaa7dc 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
@@ -53,7 +53,7 @@ public final class RequestedGlobalProperties implements 
Cloneable {
         * Initializes the global properties with no partitioning.
         */
        public RequestedGlobalProperties() {
-               this.partitioning = PartitioningProperty.RANDOM;
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -96,8 +96,14 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                this.ordering = null;
        }
        
-       public void setRandomDistribution() {
-               this.partitioning = PartitioningProperty.RANDOM;
+       public void setRandomPartitioning() {
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
+               this.partitioningFields = null;
+               this.ordering = null;
+       }
+
+       public void setAnyDistribution() {
+               this.partitioning = PartitioningProperty.ANY_DISTRIBUTION;
                this.partitioningFields = null;
                this.ordering = null;
        }
@@ -174,14 +180,14 @@ public final class RequestedGlobalProperties implements 
Cloneable {
         * Checks, if the properties in this object are trivial, i.e. only 
standard values.
         */
        public boolean isTrivial() {
-               return this.partitioning == null || this.partitioning == 
PartitioningProperty.RANDOM;
+               return this.partitioning == null || this.partitioning == 
PartitioningProperty.RANDOM_PARTITIONED;
        }
 
        /**
         * This method resets the properties to a state where no properties are 
given.
         */
        public void reset() {
-               this.partitioning = PartitioningProperty.RANDOM;
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
                this.ordering = null;
                this.partitioningFields = null;
                this.dataDistribution = null;
@@ -208,7 +214,8 @@ public final class RequestedGlobalProperties implements 
Cloneable {
                        case FULL_REPLICATION:
                        case FORCED_REBALANCED:
                        case CUSTOM_PARTITIONING:
-                       case RANDOM:
+                       case RANDOM_PARTITIONED:
+                       case ANY_DISTRIBUTION:
                                // make sure that certain properties are not 
pushed down
                                return null;
                        case HASH_PARTITIONED:
@@ -255,13 +262,15 @@ public final class RequestedGlobalProperties implements 
Cloneable {
         * @return True, if the properties are met, false otherwise.
         */
        public boolean isMetBy(GlobalProperties props) {
-               if (this.partitioning == PartitioningProperty.FULL_REPLICATION) 
{
+               if (this.partitioning == PartitioningProperty.ANY_DISTRIBUTION) 
{
+                       return true;
+               } else if (this.partitioning == 
PartitioningProperty.FULL_REPLICATION) {
                        return props.isFullyReplicated();
                }
                else if (props.isFullyReplicated()) {
                        return false;
                }
-               else if (this.partitioning == PartitioningProperty.RANDOM) {
+               else if (this.partitioning == 
PartitioningProperty.RANDOM_PARTITIONED) {
                        return true;
                }
                else if (this.partitioning == 
PartitioningProperty.ANY_PARTITIONING) {
@@ -295,9 +304,17 @@ public final class RequestedGlobalProperties implements 
Cloneable {
         * @param globalDopChange
         */
        public void parameterizeChannel(Channel channel, boolean 
globalDopChange) {
+
+               // safety check. Fully replicated input must be preserved.
+               
if(channel.getSource().getGlobalProperties().isFullyReplicated() &&
+                               (       this.partitioning != 
PartitioningProperty.FULL_REPLICATION ||
+                                       this.partitioning != 
PartitioningProperty.ANY_DISTRIBUTION)) {
+                       throw new CompilerException("Fully replicated input 
must be preserved and may not be converted into another global property.");
+               }
+
                // if we request nothing, then we need no special strategy. 
forward, if the number of instances remains
                // the same, randomly repartition otherwise
-               if (isTrivial()) {
+               if (isTrivial() || this.partitioning == 
PartitioningProperty.ANY_DISTRIBUTION) {
                        channel.setShipStrategy(globalDopChange ? 
ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD);
                        return;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
index d8f7746..b1c3079 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
@@ -144,7 +144,7 @@ public abstract class AbstractJoinDescriptor extends 
OperatorDescriptorDual {
        public GlobalProperties computeGlobalProperties(GlobalProperties in1, 
GlobalProperties in2) {
                GlobalProperties gp = GlobalProperties.combine(in1, in2);
                if (gp.getUniqueFieldCombination() != null && 
gp.getUniqueFieldCombination().size() > 0 &&
-                                       gp.getPartitioning() == 
PartitioningProperty.RANDOM)
+                                       gp.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java
index 0390c06..2d74bbe 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java
@@ -57,7 +57,7 @@ public final class AllGroupReduceProperties extends 
OperatorDescriptorSingle {
        @Override
        public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
                if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM)
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
index 2c2ddf1..54885a7 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
@@ -79,7 +79,7 @@ public final class AllGroupWithPartialPreGroupProperties 
extends OperatorDescrip
        @Override
        public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
                if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM)
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
index fefd71a..cca0bb0 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
@@ -100,7 +100,7 @@ public abstract class CartesianProductDescriptor extends 
OperatorDescriptorDual
        public GlobalProperties computeGlobalProperties(GlobalProperties in1, 
GlobalProperties in2) {
                GlobalProperties gp = GlobalProperties.combine(in1, in2);
                if (gp.getUniqueFieldCombination() != null && 
gp.getUniqueFieldCombination().size() > 0 &&
-                                       gp.getPartitioning() == 
PartitioningProperty.RANDOM)
+                                       gp.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
index bc83c51..ff4ca6e 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
@@ -183,7 +183,7 @@ public class CoGroupDescriptor extends 
OperatorDescriptorDual {
        public GlobalProperties computeGlobalProperties(GlobalProperties in1, 
GlobalProperties in2) {
                GlobalProperties gp = GlobalProperties.combine(in1, in2);
                if (gp.getUniqueFieldCombination() != null && 
gp.getUniqueFieldCombination().size() > 0 &&
-                                       gp.getPartitioning() == 
PartitioningProperty.RANDOM)
+                                       gp.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java
index 23c32a7..6007709 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java
@@ -47,7 +47,9 @@ public class CollectorMapDescriptor extends 
OperatorDescriptorSingle {
 
        @Override
        protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               return Collections.singletonList(new 
RequestedGlobalProperties());
+               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+               rgp.setAnyDistribution();
+               return Collections.singletonList(rgp);
        }
 
        @Override
@@ -58,7 +60,7 @@ public class CollectorMapDescriptor extends 
OperatorDescriptorSingle {
        @Override
        public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
                if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM)
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java
index b7ee761..0b4f373 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java
@@ -46,7 +46,9 @@ public class FilterDescriptor extends 
OperatorDescriptorSingle {
 
        @Override
        protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               return Collections.singletonList(new 
RequestedGlobalProperties());
+               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+               rgp.setAnyDistribution();
+               return Collections.singletonList(rgp);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java
index 66993c4..6b06232 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java
@@ -47,7 +47,9 @@ public class FlatMapDescriptor extends 
OperatorDescriptorSingle {
 
        @Override
        protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               return Collections.singletonList(new 
RequestedGlobalProperties());
+               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+               rgp.setAnyDistribution();
+               return Collections.singletonList(rgp);
        }
 
        @Override
@@ -58,7 +60,7 @@ public class FlatMapDescriptor extends 
OperatorDescriptorSingle {
        @Override
        public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
                if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM)
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
index ab93170..8d52503 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
@@ -114,7 +114,7 @@ public final class GroupReduceProperties extends 
OperatorDescriptorSingle {
        @Override
        public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
                if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM)
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
index 8604951..fd263e6 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
@@ -146,7 +146,7 @@ public final class GroupReduceWithCombineProperties extends 
OperatorDescriptorSi
        @Override
        public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
                if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM)
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java
index cb1d258..673716d 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java
@@ -46,7 +46,9 @@ public class MapDescriptor extends OperatorDescriptorSingle {
 
        @Override
        protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               return Collections.singletonList(new 
RequestedGlobalProperties());
+               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+               rgp.setAnyDistribution();
+               return Collections.singletonList(rgp);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
index f0af88b..dc67321 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
@@ -46,7 +46,9 @@ public class MapPartitionDescriptor extends 
OperatorDescriptorSingle {
 
        @Override
        protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               return Collections.singletonList(new 
RequestedGlobalProperties());
+               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+               rgp.setAnyDistribution();
+               return Collections.singletonList(rgp);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
index 45daceb..7919b2b 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
@@ -38,7 +38,7 @@ public abstract class OperatorDescriptorSingle implements 
AbstractOperatorDescri
        
        protected final FieldSet keys;                  // the set of key fields
        protected final FieldList keyList;              // the key fields with 
ordered field positions
-       
+
        private List<RequestedGlobalProperties> globalProps;
        private List<RequestedLocalProperties> localProps;
        
@@ -51,8 +51,8 @@ public abstract class OperatorDescriptorSingle implements 
AbstractOperatorDescri
                this.keys = keys;
                this.keyList = keys == null ? null : keys.toFieldList();
        }
-       
-       
+
+
        public List<RequestedGlobalProperties> getPossibleGlobalProperties() {
                if (this.globalProps == null) {
                        this.globalProps = createPossibleGlobalProperties();
@@ -66,7 +66,7 @@ public abstract class OperatorDescriptorSingle implements 
AbstractOperatorDescri
                }
                return this.localProps;
        }
-       
+
        /**
         * Returns a list of global properties that are required by this 
operator descriptor.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
index cf33bbe..7954773 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
@@ -76,7 +76,7 @@ public final class PartialGroupProperties extends 
OperatorDescriptorSingle {
        @Override
        public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
                if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM)
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
index 813af20..000079d 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
@@ -103,7 +103,7 @@ public final class ReduceProperties extends 
OperatorDescriptorSingle {
        @Override
        public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
                if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM)
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
                {
                        
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java 
b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
index c3a4c3a..c90a89b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.compiler;
 
+import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
 import org.junit.Assert;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.record.operators.FileDataSink;
@@ -257,8 +258,15 @@ public class DOPChangeTest extends CompilerTestBase {
                // mapper respectively reducer
                SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
                SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
+               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
+
+               Assert.assertTrue("The no sorting local strategy.",
+                               LocalStrategy.SORT == 
red2Node.getInput().getLocalStrategy() ||
+                                               LocalStrategy.SORT == 
map2Node.getInput().getLocalStrategy());
 
-               Assert.assertEquals("The Reduce 2 Node has an invalid local 
strategy.", LocalStrategy.SORT, red2Node.getInput().getLocalStrategy());
+               Assert.assertTrue("The no partitioning ship strategy.",
+                               ShipStrategyType.PARTITION_HASH == 
red2Node.getInput().getShipStrategy() ||
+                                               ShipStrategyType.PARTITION_HASH 
== map2Node.getInput().getShipStrategy());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java
new file mode 100644
index 0000000..1cbac25
--- /dev/null
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.io.ReplicatingInputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "deprecation"})
+public class ReplicatingDataSourceTest extends CompilerTestBase {
+
+       /**
+        * Tests join program with replicated data source.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInput() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests join program with replicated data source behind map.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInputBehindMap() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .map(new IdMap())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests join program with replicated data source behind filter.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInputBehindFilter() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .filter(new NoFilter())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests join program with replicated data source behind flatMap.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInputBehindFlatMap() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .flatMap(new IdFlatMap())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests join program with replicated data source behind map partition.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInputBehindMapPartition() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .mapPartition(new IdPMap())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests join program with replicated data source behind multiple map 
ops.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInputBehindMultiMaps() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .filter(new NoFilter())
+                               .mapPartition(new IdPMap())
+                               .flatMap(new IdFlatMap())
+                               .map(new IdMap())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests cross program with replicated data source.
+        */
+       @Test
+       public void checkCrossWithReplicatedSourceInput() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .cross(source2)
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when cross should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode crossNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType crossIn1 = 
crossNode.getInput1().getShipStrategy();
+               ShipStrategyType crossIn2 = 
crossNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, crossIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, crossIn2);
+       }
+
+       /**
+        * Tests cross program with replicated data source behind map and 
filter.
+        */
+       @Test
+       public void checkCrossWithReplicatedSourceInputBehindMap() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .map(new IdMap())
+                               .filter(new NoFilter())
+                               .cross(source2)
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when cross should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode crossNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType crossIn1 = 
crossNode.getInput1().getShipStrategy();
+               ShipStrategyType crossIn2 = 
crossNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, crossIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, crossIn2);
+       }
+
+       /**
+        * Tests compiler fail for join program with replicated data source and 
changing DOP.
+        */
+       @Test(expected = CompilerException.class)
+       public void checkJoinWithReplicatedSourceInputChangingDOP() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               
.join(source2).where("*").equalTo("*").setParallelism(DEFAULT_PARALLELISM+2)
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+       }
+
+       /**
+        * Tests compiler fail for join program with replicated data source 
behind map and changing DOP.
+        */
+       @Test(expected = CompilerException.class)
+       public void checkJoinWithReplicatedSourceInputBehindMapChangingDOP() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .map(new 
IdMap()).setParallelism(DEFAULT_PARALLELISM+1)
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+       }
+
+       /**
+        * Tests compiler fail for join program with replicated data source 
behind reduce.
+        */
+       @Test(expected = CompilerException.class)
+       public void checkJoinWithReplicatedSourceInputBehindReduce() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .reduce(new LastReduce())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+       }
+
+       /**
+        * Tests compiler fail for join program with replicated data source 
behind rebalance.
+        */
+       @Test(expected = CompilerException.class)
+       public void checkJoinWithReplicatedSourceInputBehindRebalance() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .rebalance()
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+       }
+
+
+       public static class IdMap<T> implements MapFunction<T,T> {
+
+               @Override
+               public T map(T value) throws Exception {
+                       return value;
+               }
+       }
+
+       public static class NoFilter<T> implements FilterFunction<T> {
+
+               @Override
+               public boolean filter(T value) throws Exception {
+                       return false;
+               }
+       }
+
+       public static class IdFlatMap<T> implements FlatMapFunction<T,T> {
+
+               @Override
+               public void flatMap(T value, Collector<T> out) throws Exception 
{
+                       out.collect(value);
+               }
+       }
+
+       public static class IdPMap<T> implements MapPartitionFunction<T,T> {
+
+               @Override
+               public void mapPartition(Iterable<T> values, Collector<T> out) 
throws Exception {
+                       for(T v : values) {
+                               out.collect(v);
+                       }
+               }
+       }
+
+       public static class LastReduce<T> implements ReduceFunction<T> {
+
+               @Override
+               public T reduce(T value1, T value2) throws Exception {
+                       return value2;
+               }
+       }
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
index f8e2242..ff7530a 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
@@ -59,7 +59,7 @@ public class GlobalPropertiesFilteringTest {
 
                GlobalProperties result = 
gprops.filterBySemanticProperties(semProps, 0);
 
-               assertEquals(PartitioningProperty.RANDOM, 
result.getPartitioning());
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
                assertNull(result.getPartitioningFields());
                assertNull(result.getPartitioningOrdering());
                assertNull(result.getUniqueFieldCombination());
@@ -78,7 +78,7 @@ public class GlobalPropertiesFilteringTest {
 
                GlobalProperties result = 
gprops.filterBySemanticProperties(semProps, 0);
 
-               assertEquals(PartitioningProperty.RANDOM, 
result.getPartitioning());
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
                assertNull(result.getPartitioningFields());
                assertNull(result.getPartitioningOrdering());
                assertNull(result.getUniqueFieldCombination());
@@ -133,7 +133,7 @@ public class GlobalPropertiesFilteringTest {
 
                GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
 
-               assertEquals(PartitioningProperty.RANDOM, 
result.getPartitioning());
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
                assertNull(result.getPartitioningFields());
        }
 
@@ -186,7 +186,7 @@ public class GlobalPropertiesFilteringTest {
 
                GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
 
-               assertEquals(PartitioningProperty.RANDOM, 
result.getPartitioning());
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
                assertNull(result.getPartitioningFields());
        }
 
@@ -242,7 +242,7 @@ public class GlobalPropertiesFilteringTest {
 
                GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
 
-               assertEquals(PartitioningProperty.RANDOM, 
result.getPartitioning());
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
                assertNull(result.getPartitioningFields());
                assertNull(result.getCustomPartitioner());
        }
@@ -330,7 +330,7 @@ public class GlobalPropertiesFilteringTest {
 
                GlobalProperties result = 
gprops.filterBySemanticProperties(sprops, 0);
 
-               assertEquals(PartitioningProperty.RANDOM, 
result.getPartitioning());
+               assertEquals(PartitioningProperty.RANDOM_PARTITIONED, 
result.getPartitioning());
                assertNull(result.getPartitioningOrdering());
                assertNull(result.getPartitioningFields());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
index e094640..3f9c0db 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
@@ -345,7 +345,7 @@ public class RequestedGlobalPropertiesFilteringTest {
                SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
 
                RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
-               rgProps.setRandomDistribution();
+               rgProps.setRandomPartitioning();
 
                RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
new file mode 100644
index 0000000..0adccaf
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+
+import java.io.IOException;
+
+/**
+ * A ReplicatingInputFormat replicates any {@link InputFormat} to all parallel 
instances of a DataSource,
+ * i.e., the full input of the replicated InputFormat is completely processed 
by each parallel instance of the DataSource.
+ * This is done by assigning all {@link org.apache.flink.core.io.InputSplit}s 
generated by the
+ * replicated InputFormat to each parallel instance.
+ *
+ * Replicated data can only be used as input for a {@link 
org.apache.flink.api.common.operators.base.JoinOperatorBase} or
+ * {@link org.apache.flink.api.common.operators.base.CrossOperatorBase} with 
the same degree of parallelism as the DataSource.
+ * Before being used as an input to a Join or Cross operator, replicated data 
might be processed in local pipelines by
+ * by Map-based operators with the same degree of parallelism as the source. 
Map-based operators are
+ * {@link org.apache.flink.api.common.operators.base.MapOperatorBase},
+ * {@link org.apache.flink.api.common.operators.base.FlatMapOperatorBase},
+ * {@link org.apache.flink.api.common.operators.base.FilterOperatorBase}, and
+ * {@link org.apache.flink.api.common.operators.base.MapPartitionOperatorBase}.
+ *
+ * Replicated DataSources can be used for local join processing (no data 
shipping) if one input is accessible on all
+ * parallel instance of a join and the other input is (randomly) partitioned 
across all parallel instances.
+ *
+ * However, a replicated DataSource is a plan hint that can invalidate a Flink 
program if not used correctly (see
+ * usage instructions above). In such situations, the optimizer is not able to 
generate a valid execution plan and
+ * the program execution will fail.
+ *
+ * @param <OT> The output type of the wrapped InputFormat.
+ * @param <S> The InputSplit type of the wrapped InputFormat.
+ *
+ * @see org.apache.flink.api.common.io.InputFormat
+ * @see org.apache.flink.api.common.operators.base.JoinOperatorBase
+ * @see org.apache.flink.api.common.operators.base.CrossOperatorBase
+ * @see org.apache.flink.api.common.operators.base.MapOperatorBase
+ * @see org.apache.flink.api.common.operators.base.FlatMapOperatorBase
+ * @see org.apache.flink.api.common.operators.base.FilterOperatorBase
+ * @see org.apache.flink.api.common.operators.base.MapPartitionOperatorBase
+ */
+public final class ReplicatingInputFormat<OT, S extends InputSplit> implements 
InputFormat<OT, S> {
+
+       private static final long serialVersionUID = 1L;
+
+       private InputFormat<OT, S> replicatedIF;
+
+       public ReplicatingInputFormat(InputFormat<OT, S> wrappedIF) {
+               this.replicatedIF = wrappedIF;
+       }
+
+       public InputFormat<OT, S> getReplicatedInputFormat() {
+               return this.replicatedIF;
+       }
+
+       @Override
+       public void configure(Configuration parameters) {
+               this.replicatedIF.configure(parameters);
+       }
+
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
+               return this.replicatedIF.getStatistics(cachedStatistics);
+       }
+
+       @Override
+       public S[] createInputSplits(int minNumSplits) throws IOException {
+               return this.replicatedIF.createInputSplits(minNumSplits);
+       }
+
+       @Override
+       public InputSplitAssigner getInputSplitAssigner(S[] inputSplits) {
+               return new ReplicatingInputSplitAssigner(inputSplits);
+       }
+
+       @Override
+       public void open(S split) throws IOException {
+               this.replicatedIF.open(split);
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               return this.replicatedIF.reachedEnd();
+       }
+
+       @Override
+       public OT nextRecord(OT reuse) throws IOException {
+               return this.replicatedIF.nextRecord(reuse);
+       }
+
+       @Override
+       public void close() throws IOException {
+               this.replicatedIF.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
new file mode 100644
index 0000000..315fbcd
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Assigns each InputSplit to each requesting parallel instance.
+ * This causes the input to be fully replicated, i.e., each parallel instance 
consumes the full input.
+ */
+public class ReplicatingInputSplitAssigner implements InputSplitAssigner {
+
+       private InputSplit[] inputSplits;
+
+       private int[] assignCounts;
+
+       public ReplicatingInputSplitAssigner(Collection<InputSplit> splits) {
+               this.inputSplits = new InputSplit[splits.size()];
+               this.inputSplits = splits.toArray(this.inputSplits);
+               this.assignCounts = new int[32];
+               Arrays.fill(assignCounts, 0);
+       }
+
+       public ReplicatingInputSplitAssigner(InputSplit[] splits) {
+               this.inputSplits = splits;
+               this.assignCounts = new int[32];
+               Arrays.fill(assignCounts, 0);
+       }
+
+       @Override
+       public InputSplit getNextInputSplit(String host, int taskId) {
+
+               // get assignment count
+               Integer assignCnt;
+               if(taskId < this.assignCounts.length) {
+                       assignCnt = this.assignCounts[taskId];
+               } else {
+                       int newSize = this.assignCounts.length * 2;
+                       if (taskId >= newSize) {
+                               newSize = taskId;
+                       }
+                       int[] newAssignCounts = Arrays.copyOf(assignCounts, 
newSize);
+                       Arrays.fill(newAssignCounts, assignCounts.length, 
newSize, 0);
+
+                       assignCnt = 0;
+               }
+
+               if(assignCnt >= inputSplits.length) {
+                       // all splits for this task have been assigned
+                       return null;
+               } else {
+                       // return next splits
+                       InputSplit is = inputSplits[assignCnt];
+                       assignCounts[taskId] = assignCnt+1;
+                       return is;
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
new file mode 100644
index 0000000..85e3e11
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.ReplicatingInputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.NumberSequenceIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests for replicating DataSources
+ */
+
+@RunWith(Parameterized.class)
+public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
+
+       public 
ReplicatingDataSourceITCase(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
+       }
+
+       private String resultPath;
+
+       private String expectedResult;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath); // 
500500 = 0+1+2+3+...+999+1000
+       }
+
+       @Test
+       public void testReplicatedSourceToJoin() throws Exception {
+               /*
+                * Test replicated source going into join
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple1<Long>> source1 = env.createInput(new 
ReplicatingInputFormat<Long, GenericInputSplit>
+                               (new ParallelIteratorInputFormat<Long>(new 
NumberSequenceIterator(0l, 1000l))), BasicTypeInfo.LONG_TYPE_INFO)
+                               .map(new ToTuple());
+               DataSet<Tuple1<Long>> source2 = env.generateSequence(0l, 
1000l).map(new ToTuple());
+
+               DataSet<Tuple> pairs = source1.join(source2).where(0).equalTo(0)
+                               .projectFirst(0)
+                               .sum(0);
+
+               pairs.writeAsText(resultPath);
+               env.execute();
+
+               expectedResult = "(500500)";
+
+       }
+
+       @Test
+       public void testReplicatedSourceToCross() throws Exception {
+               /*
+                * Test replicated source going into cross
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple1<Long>> source1 = env.createInput(new 
ReplicatingInputFormat<Long, GenericInputSplit>
+                               (new ParallelIteratorInputFormat<Long>(new 
NumberSequenceIterator(0l, 1000l))), BasicTypeInfo.LONG_TYPE_INFO)
+                               .map(new ToTuple());
+               DataSet<Tuple1<Long>> source2 = env.generateSequence(0l, 
1000l).map(new ToTuple());
+
+               DataSet<Tuple1<Long>> pairs = source1.cross(source2)
+                               .filter(new FilterFunction<Tuple2<Tuple1<Long>, 
Tuple1<Long>>>() {
+                                       @Override
+                                       public boolean 
filter(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception {
+                                               return 
value.f0.f0.equals(value.f1.f0);
+                                       }
+                               })
+                               .map(new MapFunction<Tuple2<Tuple1<Long>, 
Tuple1<Long>>, Tuple1<Long>>() {
+                                       @Override
+                                       public Tuple1<Long> 
map(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception {
+                                               return value.f0;
+                                       }
+                               })
+                               .sum(0);
+
+               pairs.writeAsText(resultPath);
+               env.execute();
+
+               expectedResult = "(500500)";
+
+       }
+
+
+       public static class ToTuple implements MapFunction<Long, Tuple1<Long>> {
+
+               @Override
+               public Tuple1<Long> map(Long value) throws Exception {
+                       return new Tuple1<Long>(value);
+               }
+       }
+
+
+}

Reply via email to