[FLINK-1443] Added support for replicated data sources. Introduced new PartitioningProperty for any distribution (random partitioning or full replication).
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19b4a02 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19b4a02 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19b4a02 Branch: refs/heads/master Commit: a19b4a02bfa5237e0dcd2b264da36229546f23c0 Parents: 7452802 Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Feb 3 16:03:07 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Feb 5 11:18:04 2015 +0100 ---------------------------------------------------------------------- .../flink/compiler/dag/DataSourceNode.java | 41 +- .../flink/compiler/dag/SingleInputNode.java | 24 +- .../apache/flink/compiler/dag/TwoInputNode.java | 33 ++ .../dataproperties/GlobalProperties.java | 17 +- .../dataproperties/PartitioningProperty.java | 11 +- .../RequestedGlobalProperties.java | 35 +- .../operators/AbstractJoinDescriptor.java | 2 +- .../operators/AllGroupReduceProperties.java | 2 +- .../AllGroupWithPartialPreGroupProperties.java | 2 +- .../operators/CartesianProductDescriptor.java | 2 +- .../compiler/operators/CoGroupDescriptor.java | 2 +- .../operators/CollectorMapDescriptor.java | 6 +- .../compiler/operators/FilterDescriptor.java | 4 +- .../compiler/operators/FlatMapDescriptor.java | 6 +- .../operators/GroupReduceProperties.java | 2 +- .../GroupReduceWithCombineProperties.java | 2 +- .../flink/compiler/operators/MapDescriptor.java | 4 +- .../operators/MapPartitionDescriptor.java | 4 +- .../operators/OperatorDescriptorSingle.java | 8 +- .../operators/PartialGroupProperties.java | 2 +- .../compiler/operators/ReduceProperties.java | 2 +- .../apache/flink/compiler/DOPChangeTest.java | 10 +- .../compiler/ReplicatingDataSourceTest.java | 495 +++++++++++++++++++ .../GlobalPropertiesFilteringTest.java | 12 +- .../RequestedGlobalPropertiesFilteringTest.java | 2 +- .../api/common/io/ReplicatingInputFormat.java | 115 +++++ .../io/ReplicatingInputSplitAssigner.java | 79 +++ .../ReplicatingDataSourceITCase.java | 141 ++++++ 28 files changed, 1006 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java index 10c77ca..af2a92b 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.ReplicatingInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.Operator; @@ -48,6 +49,8 @@ public class DataSourceNode extends OptimizerNode { private final boolean sequentialInput; + private final boolean replicatedInput; + /** * Creates a new DataSourceNode for the given contract. * @@ -67,6 +70,12 @@ public class DataSourceNode extends OptimizerNode { } else { this.sequentialInput = false; } + + if (pactContract.getUserCodeWrapper().getUserCodeObject() instanceof ReplicatingInputFormat) { + this.replicatedInput = true; + } else { + this.replicatedInput = false; + } } /** @@ -174,17 +183,31 @@ public class DataSourceNode extends OptimizerNode { if (this.cachedPlans != null) { return this.cachedPlans; } - + SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getPactContract().getName()+")"); - candidate.updatePropertiesWithUniqueSets(getUniqueFields()); - - final Costs costs = new Costs(); - if (FileInputFormat.class.isAssignableFrom(getPactContract().getFormatWrapper().getUserCodeClass()) && - this.estimatedOutputSize >= 0) - { - estimator.addFileInputCost(this.estimatedOutputSize, costs); + + if(!replicatedInput) { + candidate.updatePropertiesWithUniqueSets(getUniqueFields()); + + final Costs costs = new Costs(); + if (FileInputFormat.class.isAssignableFrom(getPactContract().getFormatWrapper().getUserCodeClass()) && + this.estimatedOutputSize >= 0) { + estimator.addFileInputCost(this.estimatedOutputSize, costs); + } + candidate.setCosts(costs); + } else { + // replicated input + final Costs costs = new Costs(); + InputFormat<?,?> inputFormat = + ((ReplicatingInputFormat<?,?>)getPactContract().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat(); + if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) && + this.estimatedOutputSize >= 0) { + estimator.addFileInputCost(this.estimatedOutputSize * this.getDegreeOfParallelism(), costs); + } + candidate.setCosts(costs); + + candidate.getGlobalProperties().setFullyReplicated(); } - candidate.setCosts(costs); // since there is only a single plan for the data-source, return a list with that element only List<PlanNode> plans = new ArrayList<PlanNode>(1); http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java index e7e82fc..70c4291 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java @@ -240,6 +240,8 @@ public abstract class SingleInputNode extends OptimizerNode { return this.cachedPlans; } + boolean childrenSkippedDueToReplicatedInput = false; + // calculate alternative sub-plans for predecessor final List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator); final Set<RequestedGlobalProperties> intGlobal = this.inConn.getInterestingProperties().getGlobalProperties(); @@ -279,6 +281,18 @@ public abstract class SingleInputNode extends OptimizerNode { // create all candidates for (PlanNode child : subPlans) { + + if(child.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if DOP is not changed + if(dopChange) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.inConn.setShipStrategy(ShipStrategyType.FORWARD); + } + } + if (this.inConn.getShipStrategy() == null) { // pick the strategy ourselves for (RequestedGlobalProperties igps: intGlobal) { @@ -325,7 +339,15 @@ public abstract class SingleInputNode extends OptimizerNode { } } } - + + if(outputPlans.isEmpty()) { + if(childrenSkippedDueToReplicatedInput) { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Invalid use of replicated input."); + } else { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Too restrictive plan hints."); + } + } + // cost and prune the plans for (PlanNode node : outputPlans) { estimator.costOperator(node); http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java index 5e9a980..b391890 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java @@ -300,6 +300,8 @@ public abstract class TwoInputNode extends OptimizerNode { return this.cachedPlans; } + boolean childrenSkippedDueToReplicatedInput = false; + // step down to all producer nodes and calculate alternative plans final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator); final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator); @@ -353,7 +355,30 @@ public abstract class TwoInputNode extends OptimizerNode { // create all candidates for (PlanNode child1 : subPlans1) { + + if(child1.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if DOP is not changed + if(dopChange1) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.input1.setShipStrategy(ShipStrategyType.FORWARD); + } + } + for (PlanNode child2 : subPlans2) { + + if(child2.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if DOP is not changed + if(dopChange2) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.input2.setShipStrategy(ShipStrategyType.FORWARD); + } + } // check that the children go together. that is the case if they build upon the same // candidate at the joined branch plan. @@ -457,6 +482,14 @@ public abstract class TwoInputNode extends OptimizerNode { } } + if(outputPlans.isEmpty()) { + if(childrenSkippedDueToReplicatedInput) { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Invalid use of replicated input."); + } else { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Too restrictive plan hints."); + } + } + // cost and prune the plans for (PlanNode node : outputPlans) { estimator.costOperator(node); http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java index fb1f1a2..ca7e64d 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java @@ -61,7 +61,7 @@ public class GlobalProperties implements Cloneable { * Initializes the global properties with no partitioning. */ public GlobalProperties() { - this.partitioning = PartitioningProperty.RANDOM; + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; } // -------------------------------------------------------------------------------------------- @@ -102,8 +102,8 @@ public class GlobalProperties implements Cloneable { this.ordering = null; } - public void setRandomDistribution() { - this.partitioning = PartitioningProperty.RANDOM; + public void setRandomPartitioned() { + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; this.partitioningFields = null; this.ordering = null; } @@ -224,14 +224,14 @@ public class GlobalProperties implements Cloneable { * Checks, if the properties in this object are trivial, i.e. only standard values. */ public boolean isTrivial() { - return partitioning == PartitioningProperty.RANDOM; + return partitioning == PartitioningProperty.RANDOM_PARTITIONED; } /** * This method resets the properties to a state where no properties are given. */ public void reset() { - this.partitioning = PartitioningProperty.RANDOM; + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; this.ordering = null; this.partitioningFields = null; } @@ -254,8 +254,6 @@ public class GlobalProperties implements Cloneable { // filter partitioning switch(this.partitioning) { - case FULL_REPLICATION: - return gp; case RANGE_PARTITIONED: // check if ordering is preserved Ordering newOrdering = new Ordering(); @@ -308,7 +306,8 @@ public class GlobalProperties implements Cloneable { } break; case FORCED_REBALANCED: - case RANDOM: + case FULL_REPLICATION: + case RANDOM_PARTITIONED: gp.partitioning = this.partitioning; break; default: @@ -350,7 +349,7 @@ public class GlobalProperties implements Cloneable { public void parameterizeChannel(Channel channel, boolean globalDopChange) { switch (this.partitioning) { - case RANDOM: + case RANDOM_PARTITIONED: channel.setShipStrategy(globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD); break; case FULL_REPLICATION: http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java index 47cd6b8..2b66ea0 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java @@ -22,11 +22,16 @@ package org.apache.flink.compiler.dataproperties; * An enumeration tracking the different types of sharding strategies. */ public enum PartitioningProperty { - + + /** + * Any data distribution, i.e., random partitioning or full replication. + */ + ANY_DISTRIBUTION, + /** * Constant indicating no particular partitioning (i.e. random) data distribution. */ - RANDOM, + RANDOM_PARTITIONED, /** * Constant indicating a hash partitioning. @@ -85,7 +90,7 @@ public enum PartitioningProperty { * @return True, if the data is partitioned on a key. */ public boolean isPartitionedOnKey() { - return isPartitioned() && this != RANDOM; + return isPartitioned() && this != RANDOM_PARTITIONED; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java index f304bf6..daaa7dc 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java @@ -53,7 +53,7 @@ public final class RequestedGlobalProperties implements Cloneable { * Initializes the global properties with no partitioning. */ public RequestedGlobalProperties() { - this.partitioning = PartitioningProperty.RANDOM; + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; } // -------------------------------------------------------------------------------------------- @@ -96,8 +96,14 @@ public final class RequestedGlobalProperties implements Cloneable { this.ordering = null; } - public void setRandomDistribution() { - this.partitioning = PartitioningProperty.RANDOM; + public void setRandomPartitioning() { + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; + this.partitioningFields = null; + this.ordering = null; + } + + public void setAnyDistribution() { + this.partitioning = PartitioningProperty.ANY_DISTRIBUTION; this.partitioningFields = null; this.ordering = null; } @@ -174,14 +180,14 @@ public final class RequestedGlobalProperties implements Cloneable { * Checks, if the properties in this object are trivial, i.e. only standard values. */ public boolean isTrivial() { - return this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM; + return this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM_PARTITIONED; } /** * This method resets the properties to a state where no properties are given. */ public void reset() { - this.partitioning = PartitioningProperty.RANDOM; + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; this.ordering = null; this.partitioningFields = null; this.dataDistribution = null; @@ -208,7 +214,8 @@ public final class RequestedGlobalProperties implements Cloneable { case FULL_REPLICATION: case FORCED_REBALANCED: case CUSTOM_PARTITIONING: - case RANDOM: + case RANDOM_PARTITIONED: + case ANY_DISTRIBUTION: // make sure that certain properties are not pushed down return null; case HASH_PARTITIONED: @@ -255,13 +262,15 @@ public final class RequestedGlobalProperties implements Cloneable { * @return True, if the properties are met, false otherwise. */ public boolean isMetBy(GlobalProperties props) { - if (this.partitioning == PartitioningProperty.FULL_REPLICATION) { + if (this.partitioning == PartitioningProperty.ANY_DISTRIBUTION) { + return true; + } else if (this.partitioning == PartitioningProperty.FULL_REPLICATION) { return props.isFullyReplicated(); } else if (props.isFullyReplicated()) { return false; } - else if (this.partitioning == PartitioningProperty.RANDOM) { + else if (this.partitioning == PartitioningProperty.RANDOM_PARTITIONED) { return true; } else if (this.partitioning == PartitioningProperty.ANY_PARTITIONING) { @@ -295,9 +304,17 @@ public final class RequestedGlobalProperties implements Cloneable { * @param globalDopChange */ public void parameterizeChannel(Channel channel, boolean globalDopChange) { + + // safety check. Fully replicated input must be preserved. + if(channel.getSource().getGlobalProperties().isFullyReplicated() && + ( this.partitioning != PartitioningProperty.FULL_REPLICATION || + this.partitioning != PartitioningProperty.ANY_DISTRIBUTION)) { + throw new CompilerException("Fully replicated input must be preserved and may not be converted into another global property."); + } + // if we request nothing, then we need no special strategy. forward, if the number of instances remains // the same, randomly repartition otherwise - if (isTrivial()) { + if (isTrivial() || this.partitioning == PartitioningProperty.ANY_DISTRIBUTION) { channel.setShipStrategy(globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD); return; } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java index d8f7746..b1c3079 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java @@ -144,7 +144,7 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual { public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { GlobalProperties gp = GlobalProperties.combine(in1, in2); if (gp.getUniqueFieldCombination() != null && gp.getUniqueFieldCombination().size() > 0 && - gp.getPartitioning() == PartitioningProperty.RANDOM) + gp.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java index 0390c06..2d74bbe 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java @@ -57,7 +57,7 @@ public final class AllGroupReduceProperties extends OperatorDescriptorSingle { @Override public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM) + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java index 2c2ddf1..54885a7 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java @@ -79,7 +79,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip @Override public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM) + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java index fefd71a..cca0bb0 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java @@ -100,7 +100,7 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { GlobalProperties gp = GlobalProperties.combine(in1, in2); if (gp.getUniqueFieldCombination() != null && gp.getUniqueFieldCombination().size() > 0 && - gp.getPartitioning() == PartitioningProperty.RANDOM) + gp.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java index bc83c51..ff4ca6e 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java @@ -183,7 +183,7 @@ public class CoGroupDescriptor extends OperatorDescriptorDual { public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { GlobalProperties gp = GlobalProperties.combine(in1, in2); if (gp.getUniqueFieldCombination() != null && gp.getUniqueFieldCombination().size() > 0 && - gp.getPartitioning() == PartitioningProperty.RANDOM) + gp.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java index 23c32a7..6007709 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java @@ -47,7 +47,9 @@ public class CollectorMapDescriptor extends OperatorDescriptorSingle { @Override protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - return Collections.singletonList(new RequestedGlobalProperties()); + RequestedGlobalProperties rgp = new RequestedGlobalProperties(); + rgp.setAnyDistribution(); + return Collections.singletonList(rgp); } @Override @@ -58,7 +60,7 @@ public class CollectorMapDescriptor extends OperatorDescriptorSingle { @Override public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM) + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java index b7ee761..0b4f373 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java @@ -46,7 +46,9 @@ public class FilterDescriptor extends OperatorDescriptorSingle { @Override protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - return Collections.singletonList(new RequestedGlobalProperties()); + RequestedGlobalProperties rgp = new RequestedGlobalProperties(); + rgp.setAnyDistribution(); + return Collections.singletonList(rgp); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java index 66993c4..6b06232 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java @@ -47,7 +47,9 @@ public class FlatMapDescriptor extends OperatorDescriptorSingle { @Override protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - return Collections.singletonList(new RequestedGlobalProperties()); + RequestedGlobalProperties rgp = new RequestedGlobalProperties(); + rgp.setAnyDistribution(); + return Collections.singletonList(rgp); } @Override @@ -58,7 +60,7 @@ public class FlatMapDescriptor extends OperatorDescriptorSingle { @Override public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM) + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java index ab93170..8d52503 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java @@ -114,7 +114,7 @@ public final class GroupReduceProperties extends OperatorDescriptorSingle { @Override public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM) + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java index 8604951..fd263e6 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java @@ -146,7 +146,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi @Override public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM) + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java index cb1d258..673716d 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java @@ -46,7 +46,9 @@ public class MapDescriptor extends OperatorDescriptorSingle { @Override protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - return Collections.singletonList(new RequestedGlobalProperties()); + RequestedGlobalProperties rgp = new RequestedGlobalProperties(); + rgp.setAnyDistribution(); + return Collections.singletonList(rgp); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java index f0af88b..dc67321 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java @@ -46,7 +46,9 @@ public class MapPartitionDescriptor extends OperatorDescriptorSingle { @Override protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - return Collections.singletonList(new RequestedGlobalProperties()); + RequestedGlobalProperties rgp = new RequestedGlobalProperties(); + rgp.setAnyDistribution(); + return Collections.singletonList(rgp); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java index 45daceb..7919b2b 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java @@ -38,7 +38,7 @@ public abstract class OperatorDescriptorSingle implements AbstractOperatorDescri protected final FieldSet keys; // the set of key fields protected final FieldList keyList; // the key fields with ordered field positions - + private List<RequestedGlobalProperties> globalProps; private List<RequestedLocalProperties> localProps; @@ -51,8 +51,8 @@ public abstract class OperatorDescriptorSingle implements AbstractOperatorDescri this.keys = keys; this.keyList = keys == null ? null : keys.toFieldList(); } - - + + public List<RequestedGlobalProperties> getPossibleGlobalProperties() { if (this.globalProps == null) { this.globalProps = createPossibleGlobalProperties(); @@ -66,7 +66,7 @@ public abstract class OperatorDescriptorSingle implements AbstractOperatorDescri } return this.localProps; } - + /** * Returns a list of global properties that are required by this operator descriptor. * http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java index cf33bbe..7954773 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java @@ -76,7 +76,7 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle { @Override public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM) + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java index 813af20..000079d 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java @@ -103,7 +103,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle { @Override public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM) + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java index c3a4c3a..c90a89b 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java @@ -17,6 +17,7 @@ */ package org.apache.flink.compiler; +import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator; import org.junit.Assert; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.record.operators.FileDataSink; @@ -257,8 +258,15 @@ public class DOPChangeTest extends CompilerTestBase { // mapper respectively reducer SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); + SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); + + Assert.assertTrue("The no sorting local strategy.", + LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() || + LocalStrategy.SORT == map2Node.getInput().getLocalStrategy()); - Assert.assertEquals("The Reduce 2 Node has an invalid local strategy.", LocalStrategy.SORT, red2Node.getInput().getLocalStrategy()); + Assert.assertTrue("The no partitioning ship strategy.", + ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() || + ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java new file mode 100644 index 0000000..1cbac25 --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.io.ReplicatingInputFormat; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.compiler.plan.DualInputPlanNode; +import org.apache.flink.compiler.plan.OptimizedPlan; +import org.apache.flink.compiler.plan.SinkPlanNode; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +@SuppressWarnings({"serial", "deprecation"}) +public class ReplicatingDataSourceTest extends CompilerTestBase { + + /** + * Tests join program with replicated data source. + */ + @Test + public void checkJoinWithReplicatedSourceInput() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .join(source2).where("*").equalTo("*") + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when join should have forward strategy on both sides + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor(); + + ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy(); + ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2); + } + + /** + * Tests join program with replicated data source behind map. + */ + @Test + public void checkJoinWithReplicatedSourceInputBehindMap() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .map(new IdMap()) + .join(source2).where("*").equalTo("*") + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when join should have forward strategy on both sides + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor(); + + ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy(); + ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2); + } + + /** + * Tests join program with replicated data source behind filter. + */ + @Test + public void checkJoinWithReplicatedSourceInputBehindFilter() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .filter(new NoFilter()) + .join(source2).where("*").equalTo("*") + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when join should have forward strategy on both sides + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor(); + + ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy(); + ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2); + } + + /** + * Tests join program with replicated data source behind flatMap. + */ + @Test + public void checkJoinWithReplicatedSourceInputBehindFlatMap() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .flatMap(new IdFlatMap()) + .join(source2).where("*").equalTo("*") + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when join should have forward strategy on both sides + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor(); + + ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy(); + ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2); + } + + /** + * Tests join program with replicated data source behind map partition. + */ + @Test + public void checkJoinWithReplicatedSourceInputBehindMapPartition() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .mapPartition(new IdPMap()) + .join(source2).where("*").equalTo("*") + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when join should have forward strategy on both sides + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor(); + + ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy(); + ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2); + } + + /** + * Tests join program with replicated data source behind multiple map ops. + */ + @Test + public void checkJoinWithReplicatedSourceInputBehindMultiMaps() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .filter(new NoFilter()) + .mapPartition(new IdPMap()) + .flatMap(new IdFlatMap()) + .map(new IdMap()) + .join(source2).where("*").equalTo("*") + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when join should have forward strategy on both sides + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor(); + + ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy(); + ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2); + } + + /** + * Tests cross program with replicated data source. + */ + @Test + public void checkCrossWithReplicatedSourceInput() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .cross(source2) + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when cross should have forward strategy on both sides + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode crossNode = (DualInputPlanNode) sinkNode.getPredecessor(); + + ShipStrategyType crossIn1 = crossNode.getInput1().getShipStrategy(); + ShipStrategyType crossIn2 = crossNode.getInput2().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn1); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn2); + } + + /** + * Tests cross program with replicated data source behind map and filter. + */ + @Test + public void checkCrossWithReplicatedSourceInputBehindMap() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .map(new IdMap()) + .filter(new NoFilter()) + .cross(source2) + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when cross should have forward strategy on both sides + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode crossNode = (DualInputPlanNode) sinkNode.getPredecessor(); + + ShipStrategyType crossIn1 = crossNode.getInput1().getShipStrategy(); + ShipStrategyType crossIn2 = crossNode.getInput2().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn1); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn2); + } + + /** + * Tests compiler fail for join program with replicated data source and changing DOP. + */ + @Test(expected = CompilerException.class) + public void checkJoinWithReplicatedSourceInputChangingDOP() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .join(source2).where("*").equalTo("*").setParallelism(DEFAULT_PARALLELISM+2) + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + } + + /** + * Tests compiler fail for join program with replicated data source behind map and changing DOP. + */ + @Test(expected = CompilerException.class) + public void checkJoinWithReplicatedSourceInputBehindMapChangingDOP() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .map(new IdMap()).setParallelism(DEFAULT_PARALLELISM+1) + .join(source2).where("*").equalTo("*") + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + } + + /** + * Tests compiler fail for join program with replicated data source behind reduce. + */ + @Test(expected = CompilerException.class) + public void checkJoinWithReplicatedSourceInputBehindReduce() { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .reduce(new LastReduce()) + .join(source2).where("*").equalTo("*") + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + } + + /** + * Tests compiler fail for join program with replicated data source behind rebalance. + */ + @Test(expected = CompilerException.class) + public void checkJoinWithReplicatedSourceInputBehindRebalance() { + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif = + new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class)); + + DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO)); + DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class); + + DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1 + .rebalance() + .join(source2).where("*").equalTo("*") + .writeAsText("/some/newpath"); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + } + + + public static class IdMap<T> implements MapFunction<T,T> { + + @Override + public T map(T value) throws Exception { + return value; + } + } + + public static class NoFilter<T> implements FilterFunction<T> { + + @Override + public boolean filter(T value) throws Exception { + return false; + } + } + + public static class IdFlatMap<T> implements FlatMapFunction<T,T> { + + @Override + public void flatMap(T value, Collector<T> out) throws Exception { + out.collect(value); + } + } + + public static class IdPMap<T> implements MapPartitionFunction<T,T> { + + @Override + public void mapPartition(Iterable<T> values, Collector<T> out) throws Exception { + for(T v : values) { + out.collect(v); + } + } + } + + public static class LastReduce<T> implements ReduceFunction<T> { + + @Override + public T reduce(T value1, T value2) throws Exception { + return value2; + } + } + + +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java index f8e2242..ff7530a 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java @@ -59,7 +59,7 @@ public class GlobalPropertiesFilteringTest { GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0); - assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); assertNull(result.getPartitioningFields()); assertNull(result.getPartitioningOrdering()); assertNull(result.getUniqueFieldCombination()); @@ -78,7 +78,7 @@ public class GlobalPropertiesFilteringTest { GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0); - assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); assertNull(result.getPartitioningFields()); assertNull(result.getPartitioningOrdering()); assertNull(result.getUniqueFieldCombination()); @@ -133,7 +133,7 @@ public class GlobalPropertiesFilteringTest { GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); assertNull(result.getPartitioningFields()); } @@ -186,7 +186,7 @@ public class GlobalPropertiesFilteringTest { GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); assertNull(result.getPartitioningFields()); } @@ -242,7 +242,7 @@ public class GlobalPropertiesFilteringTest { GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); assertNull(result.getPartitioningFields()); assertNull(result.getCustomPartitioner()); } @@ -330,7 +330,7 @@ public class GlobalPropertiesFilteringTest { GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); - assertEquals(PartitioningProperty.RANDOM, result.getPartitioning()); + assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning()); assertNull(result.getPartitioningOrdering()); assertNull(result.getPartitioningFields()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java index e094640..3f9c0db 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java @@ -345,7 +345,7 @@ public class RequestedGlobalPropertiesFilteringTest { SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo); RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); - rgProps.setRandomDistribution(); + rgProps.setRandomPartitioning(); RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java new file mode 100644 index 0000000..0adccaf --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io; + + +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; + +import java.io.IOException; + +/** + * A ReplicatingInputFormat replicates any {@link InputFormat} to all parallel instances of a DataSource, + * i.e., the full input of the replicated InputFormat is completely processed by each parallel instance of the DataSource. + * This is done by assigning all {@link org.apache.flink.core.io.InputSplit}s generated by the + * replicated InputFormat to each parallel instance. + * + * Replicated data can only be used as input for a {@link org.apache.flink.api.common.operators.base.JoinOperatorBase} or + * {@link org.apache.flink.api.common.operators.base.CrossOperatorBase} with the same degree of parallelism as the DataSource. + * Before being used as an input to a Join or Cross operator, replicated data might be processed in local pipelines by + * by Map-based operators with the same degree of parallelism as the source. Map-based operators are + * {@link org.apache.flink.api.common.operators.base.MapOperatorBase}, + * {@link org.apache.flink.api.common.operators.base.FlatMapOperatorBase}, + * {@link org.apache.flink.api.common.operators.base.FilterOperatorBase}, and + * {@link org.apache.flink.api.common.operators.base.MapPartitionOperatorBase}. + * + * Replicated DataSources can be used for local join processing (no data shipping) if one input is accessible on all + * parallel instance of a join and the other input is (randomly) partitioned across all parallel instances. + * + * However, a replicated DataSource is a plan hint that can invalidate a Flink program if not used correctly (see + * usage instructions above). In such situations, the optimizer is not able to generate a valid execution plan and + * the program execution will fail. + * + * @param <OT> The output type of the wrapped InputFormat. + * @param <S> The InputSplit type of the wrapped InputFormat. + * + * @see org.apache.flink.api.common.io.InputFormat + * @see org.apache.flink.api.common.operators.base.JoinOperatorBase + * @see org.apache.flink.api.common.operators.base.CrossOperatorBase + * @see org.apache.flink.api.common.operators.base.MapOperatorBase + * @see org.apache.flink.api.common.operators.base.FlatMapOperatorBase + * @see org.apache.flink.api.common.operators.base.FilterOperatorBase + * @see org.apache.flink.api.common.operators.base.MapPartitionOperatorBase + */ +public final class ReplicatingInputFormat<OT, S extends InputSplit> implements InputFormat<OT, S> { + + private static final long serialVersionUID = 1L; + + private InputFormat<OT, S> replicatedIF; + + public ReplicatingInputFormat(InputFormat<OT, S> wrappedIF) { + this.replicatedIF = wrappedIF; + } + + public InputFormat<OT, S> getReplicatedInputFormat() { + return this.replicatedIF; + } + + @Override + public void configure(Configuration parameters) { + this.replicatedIF.configure(parameters); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return this.replicatedIF.getStatistics(cachedStatistics); + } + + @Override + public S[] createInputSplits(int minNumSplits) throws IOException { + return this.replicatedIF.createInputSplits(minNumSplits); + } + + @Override + public InputSplitAssigner getInputSplitAssigner(S[] inputSplits) { + return new ReplicatingInputSplitAssigner(inputSplits); + } + + @Override + public void open(S split) throws IOException { + this.replicatedIF.open(split); + } + + @Override + public boolean reachedEnd() throws IOException { + return this.replicatedIF.reachedEnd(); + } + + @Override + public OT nextRecord(OT reuse) throws IOException { + return this.replicatedIF.nextRecord(reuse); + } + + @Override + public void close() throws IOException { + this.replicatedIF.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java new file mode 100644 index 0000000..315fbcd --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io; + +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Assigns each InputSplit to each requesting parallel instance. + * This causes the input to be fully replicated, i.e., each parallel instance consumes the full input. + */ +public class ReplicatingInputSplitAssigner implements InputSplitAssigner { + + private InputSplit[] inputSplits; + + private int[] assignCounts; + + public ReplicatingInputSplitAssigner(Collection<InputSplit> splits) { + this.inputSplits = new InputSplit[splits.size()]; + this.inputSplits = splits.toArray(this.inputSplits); + this.assignCounts = new int[32]; + Arrays.fill(assignCounts, 0); + } + + public ReplicatingInputSplitAssigner(InputSplit[] splits) { + this.inputSplits = splits; + this.assignCounts = new int[32]; + Arrays.fill(assignCounts, 0); + } + + @Override + public InputSplit getNextInputSplit(String host, int taskId) { + + // get assignment count + Integer assignCnt; + if(taskId < this.assignCounts.length) { + assignCnt = this.assignCounts[taskId]; + } else { + int newSize = this.assignCounts.length * 2; + if (taskId >= newSize) { + newSize = taskId; + } + int[] newAssignCounts = Arrays.copyOf(assignCounts, newSize); + Arrays.fill(newAssignCounts, assignCounts.length, newSize, 0); + + assignCnt = 0; + } + + if(assignCnt >= inputSplits.length) { + // all splits for this task have been assigned + return null; + } else { + // return next splits + InputSplit is = inputSplits[assignCnt]; + assignCounts[taskId] = assignCnt+1; + return is; + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java new file mode 100644 index 0000000..85e3e11 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators; + + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.io.ReplicatingInputFormat; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.ParallelIteratorInputFormat; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.NumberSequenceIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Tests for replicating DataSources + */ + +@RunWith(Parameterized.class) +public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase { + + public ReplicatingDataSourceITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); // 500500 = 0+1+2+3+...+999+1000 + } + + @Test + public void testReplicatedSourceToJoin() throws Exception { + /* + * Test replicated source going into join + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple1<Long>> source1 = env.createInput(new ReplicatingInputFormat<Long, GenericInputSplit> + (new ParallelIteratorInputFormat<Long>(new NumberSequenceIterator(0l, 1000l))), BasicTypeInfo.LONG_TYPE_INFO) + .map(new ToTuple()); + DataSet<Tuple1<Long>> source2 = env.generateSequence(0l, 1000l).map(new ToTuple()); + + DataSet<Tuple> pairs = source1.join(source2).where(0).equalTo(0) + .projectFirst(0) + .sum(0); + + pairs.writeAsText(resultPath); + env.execute(); + + expectedResult = "(500500)"; + + } + + @Test + public void testReplicatedSourceToCross() throws Exception { + /* + * Test replicated source going into cross + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple1<Long>> source1 = env.createInput(new ReplicatingInputFormat<Long, GenericInputSplit> + (new ParallelIteratorInputFormat<Long>(new NumberSequenceIterator(0l, 1000l))), BasicTypeInfo.LONG_TYPE_INFO) + .map(new ToTuple()); + DataSet<Tuple1<Long>> source2 = env.generateSequence(0l, 1000l).map(new ToTuple()); + + DataSet<Tuple1<Long>> pairs = source1.cross(source2) + .filter(new FilterFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>>() { + @Override + public boolean filter(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception { + return value.f0.f0.equals(value.f1.f0); + } + }) + .map(new MapFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>, Tuple1<Long>>() { + @Override + public Tuple1<Long> map(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception { + return value.f0; + } + }) + .sum(0); + + pairs.writeAsText(resultPath); + env.execute(); + + expectedResult = "(500500)"; + + } + + + public static class ToTuple implements MapFunction<Long, Tuple1<Long>> { + + @Override + public Tuple1<Long> map(Long value) throws Exception { + return new Tuple1<Long>(value); + } + } + + +}