Repository: flink Updated Branches: refs/heads/master 0df5601ad -> b0a57c32f
[FLINK-1628] [optimizer] Fix partitioning properties for Joins and CoGroups. This closes #458 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a57c32 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a57c32 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a57c32 Branch: refs/heads/master Commit: b0a57c32fc68d4a1369e5ece25d3a6e986ee1e2a Parents: 0df5601 Author: Fabian Hueske <fhue...@apache.org> Authored: Wed Mar 4 18:49:22 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Mar 10 01:43:41 2015 +0100 ---------------------------------------------------------------------- .../dataproperties/GlobalProperties.java | 8 + .../dataproperties/PartitioningProperty.java | 2 +- .../RequestedGlobalProperties.java | 59 +- .../operators/AbstractJoinDescriptor.java | 49 +- .../compiler/operators/CoGroupDescriptor.java | 72 +- .../operators/OperatorDescriptorDual.java | 44 +- .../operators/SortMergeJoinDescriptor.java | 13 +- .../compiler/FeedbackPropertiesMatchTest.java | 8 +- .../flink/compiler/PartitioningReusageTest.java | 859 +++++++++++++++++++ .../GlobalPropertiesMatchingTest.java | 152 +++- .../RequestedGlobalPropertiesFilteringTest.java | 2 +- .../api/common/operators/util/FieldList.java | 15 +- 12 files changed, 1224 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java index ca7e64d..31e13ae 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java @@ -183,6 +183,14 @@ public class GlobalProperties implements Cloneable { return false; } } + + public boolean isExactlyPartitionedOnFields(FieldList fields) { + if (this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields)) { + return true; + } else { + return false; + } + } public boolean matchesOrderedPartitioning(Ordering o) { if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) { http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java index 2b66ea0..45e323e 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java @@ -71,7 +71,7 @@ public enum PartitioningProperty { * false otherwise. */ public boolean isPartitioned() { - return this != FULL_REPLICATION && this != FORCED_REBALANCED; + return this != FULL_REPLICATION && this != FORCED_REBALANCED && this != ANY_DISTRIBUTION; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java index daaa7dc..f4334ff 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.plan.Channel; @@ -46,7 +47,7 @@ public final class RequestedGlobalProperties implements Cloneable { private DataDistribution dataDistribution; // optional data distribution, for a range partitioning private Partitioner<?> customPartitioner; // optional, partitioner for custom partitioning - + // -------------------------------------------------------------------------------------------- /** @@ -60,7 +61,9 @@ public final class RequestedGlobalProperties implements Cloneable { /** * Sets the partitioning property for the global properties. - * + * If the partitionFields are provided as {@link FieldSet} also subsets are valid, + * if provided as {@link FieldList} partitioning fields must exactly match incl. order. + * * @param partitionedFields */ public void setHashPartitioned(FieldSet partitionedFields) { @@ -86,7 +89,14 @@ public final class RequestedGlobalProperties implements Cloneable { this.partitioningFields = null; this.dataDistribution = dataDistribution; } - + + /** + * Sets the partitioning property for the global properties. + * If the partitionFields are provided as {@link FieldSet} also subsets are valid, + * if provided as {@link FieldList} partitioning fields must exactly match incl. order. + * + * @param partitionedFields + */ public void setAnyPartitioning(FieldSet partitionedFields) { if (partitionedFields == null) { throw new NullPointerException(); @@ -119,7 +129,14 @@ public final class RequestedGlobalProperties implements Cloneable { this.partitioningFields = null; this.ordering = null; } - + + /** + * Sets the partitioning property for the global properties. + * If the partitionFields are provided as {@link FieldSet} also subsets are valid, + * if provided as {@link FieldList} partitioning fields must exactly match incl. order. + * + * @param partitionedFields + */ public void setCustomPartitioned(FieldSet partitionedFields, Partitioner<?> partitioner) { if (partitionedFields == null || partitioner == null) { throw new NullPointerException(); @@ -130,7 +147,7 @@ public final class RequestedGlobalProperties implements Cloneable { this.ordering = null; this.customPartitioner = partitioner; } - + /** * Gets the partitioning property. * @@ -148,7 +165,7 @@ public final class RequestedGlobalProperties implements Cloneable { public FieldSet getPartitionedFields() { return this.partitioningFields; } - + /** * Gets the key order. * @@ -220,7 +237,13 @@ public final class RequestedGlobalProperties implements Cloneable { return null; case HASH_PARTITIONED: case ANY_PARTITIONING: - FieldSet newFields = new FieldSet(); + FieldSet newFields; + if(this.partitioningFields instanceof FieldList) { + newFields = new FieldList(); + } else { + newFields = new FieldSet(); + } + for (Integer targetField : this.partitioningFields) { int sourceField = props.getForwardingSourceField(input, targetField); if (sourceField >= 0) { @@ -274,11 +297,11 @@ public final class RequestedGlobalProperties implements Cloneable { return true; } else if (this.partitioning == PartitioningProperty.ANY_PARTITIONING) { - return props.isPartitionedOnFields(this.partitioningFields); + return checkCompatiblePartitioningFields(props); } else if (this.partitioning == PartitioningProperty.HASH_PARTITIONED) { return props.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && - props.isPartitionedOnFields(this.partitioningFields); + checkCompatiblePartitioningFields(props); } else if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) { return props.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && @@ -289,14 +312,15 @@ public final class RequestedGlobalProperties implements Cloneable { } else if (this.partitioning == PartitioningProperty.CUSTOM_PARTITIONING) { return props.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING && - props.isPartitionedOnFields(this.partitioningFields) && + checkCompatiblePartitioningFields(props) && props.getCustomPartitioner().equals(this.customPartitioner); + } else { throw new CompilerException("Properties matching logic leaves open cases."); } } - + /** * Parameterizes the ship strategy fields of a channel such that the channel produces the desired global properties. * @@ -307,8 +331,8 @@ public final class RequestedGlobalProperties implements Cloneable { // safety check. Fully replicated input must be preserved. if(channel.getSource().getGlobalProperties().isFullyReplicated() && - ( this.partitioning != PartitioningProperty.FULL_REPLICATION || - this.partitioning != PartitioningProperty.ANY_DISTRIBUTION)) { + !(this.partitioning == PartitioningProperty.FULL_REPLICATION || + this.partitioning == PartitioningProperty.ANY_DISTRIBUTION)) { throw new CompilerException("Fully replicated input must be preserved and may not be converted into another global property."); } @@ -397,4 +421,13 @@ public final class RequestedGlobalProperties implements Cloneable { throw new RuntimeException(cnse); } } + + private boolean checkCompatiblePartitioningFields(GlobalProperties props) { + if(this.partitioningFields instanceof FieldList) { + // partitioningFields as FieldList requires strict checking! + return props.isExactlyPartitionedOnFields((FieldList)this.partitioningFields); + } else { + return props.isPartitionedOnFields(this.partitioningFields); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java index b1c3079..21dd3f4 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java @@ -64,15 +64,15 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual { if (repartitionAllowed) { // partition both (hash or custom) if (this.customPartitioner == null) { - + // we accept compatible partitionings of any type RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties(); RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties(); partitioned_left_any.setAnyPartitioning(this.keys1); partitioned_right_any.setAnyPartitioning(this.keys2); pairs.add(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any)); - - // we also explicitly add hash partitioning, as a fallback, if the any-pairs do not match + + // add strict hash partitioning of both inputs on their full key sets RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties(); RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties(); partitioned_left_hash.setHashPartitioned(this.keys1); @@ -82,10 +82,10 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual { else { RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties(); partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner); - + RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties(); partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner); - + return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right)); } @@ -130,10 +130,40 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual { GlobalProperties produced1, GlobalProperties produced2) { if (requested1.getPartitioning().isPartitionedOnKey() && requested2.getPartitioning().isPartitionedOnKey()) { - return produced1.getPartitioning() == produced2.getPartitioning() && - (produced1.getCustomPartitioner() == null ? - produced2.getCustomPartitioner() == null : - produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner())); + + if(produced1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && + produced2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { + + // both are hash partitioned, check that partitioning fields are equivalently chosen + return checkEquivalentFieldPositionsInKeyFields( + produced1.getPartitioningFields(), produced2.getPartitioningFields()); + + } + else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && + produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { + + // both are range partitioned, check that partitioning fields are equivalently chosen + return checkEquivalentFieldPositionsInKeyFields( + produced1.getPartitioningFields(), produced2.getPartitioningFields()); + + } + else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING && + produced2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING) { + + // both use a custom partitioner. Check that both keys are exactly as specified and that both the same partitioner + return produced1.getPartitioningFields().isExactMatch(this.keys1) && + produced2.getPartitioningFields().isExactMatch(this.keys2) && + produced1.getCustomPartitioner() != null && produced2.getCustomPartitioner() != null && + produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()); + + } + else { + + // no other partitioning valid, incl. ANY_PARTITIONING. + // For joins we must ensure that both sides are exactly identically partitioned, ANY is not good enough. + return false; + } + } else { return true; } @@ -151,4 +181,5 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual { gp.clearUniqueFieldCombinations(); return gp; } + } http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java index ff4ca6e..f8d99f8 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java @@ -99,27 +99,31 @@ public class CoGroupDescriptor extends OperatorDescriptorDual { @Override protected List<GlobalPropertiesPair> createPossibleGlobalProperties() { + if (this.customPartitioner == null) { + + // we accept compatible partitionings of any type RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties(); - RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties(); - partitioned_left_any.setAnyPartitioning(this.keys1); - partitioned_left_hash.setHashPartitioned(this.keys1); - RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties(); - RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties(); + partitioned_left_any.setAnyPartitioning(this.keys1); partitioned_right_any.setAnyPartitioning(this.keys2); + + // add strict hash partitioning of both inputs on their full key sets + RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties(); + RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties(); + partitioned_left_hash.setHashPartitioned(this.keys1); partitioned_right_hash.setHashPartitioned(this.keys2); - + return Arrays.asList(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any), new GlobalPropertiesPair(partitioned_left_hash, partitioned_right_hash)); } else { RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties(); partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner); - + RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties(); partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner); - + return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right)); } } @@ -135,10 +139,40 @@ public class CoGroupDescriptor extends OperatorDescriptorDual { public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, GlobalProperties produced1, GlobalProperties produced2) { - return produced1.getPartitioning() == produced2.getPartitioning() && - (produced1.getCustomPartitioner() == null ? - produced2.getCustomPartitioner() == null : - produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner())); + + if(produced1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && + produced2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { + + // both are hash partitioned, check that partitioning fields are equivalently chosen + return checkEquivalentFieldPositionsInKeyFields( + produced1.getPartitioningFields(), produced2.getPartitioningFields()); + + } + else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && + produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { + + // both are range partitioned, check that partitioning fields are equivalently chosen + return checkEquivalentFieldPositionsInKeyFields( + produced1.getPartitioningFields(), produced2.getPartitioningFields()); + + } + else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING && + produced2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING) { + + // both use a custom partitioner. Check that both keys are exactly as specified and that both the same partitioner + return produced1.getPartitioningFields().isExactMatch(this.keys1) && + produced2.getPartitioningFields().isExactMatch(this.keys2) && + produced1.getCustomPartitioner() != null && produced2.getCustomPartitioner() != null && + produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()); + + } + else { + + // no other partitioning valid, incl. ANY_PARTITIONING. + // For co-groups we must ensure that both sides are exactly identically partitioned, ANY is not good enough. + return false; + } + } @Override @@ -150,12 +184,17 @@ public class CoGroupDescriptor extends OperatorDescriptorDual { Ordering prod1 = produced1.getOrdering(); Ordering prod2 = produced2.getOrdering(); - if (prod1 == null || prod2 == null || prod1.getNumberOfFields() < numRelevantFields || - prod2.getNumberOfFields() < prod2.getNumberOfFields()) - { + if (prod1 == null || prod2 == null) { throw new CompilerException("The given properties do not meet this operators requirements."); } - + + // check that order of fields is equivalent + if (!checkEquivalentFieldPositionsInKeyFields( + prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) { + return false; + } + + // check that order directions are equivalent for (int i = 0; i < numRelevantFields; i++) { if (prod1.getOrder(i) != prod2.getOrder(i)) { return false; @@ -196,4 +235,5 @@ public class CoGroupDescriptor extends OperatorDescriptorDual { LocalProperties comb = LocalProperties.combine(in1, in2); return comb.clearUniqueFieldSets(); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java index 8eca16e..f72e6b5 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java @@ -22,6 +22,7 @@ package org.apache.flink.compiler.operators; import java.util.List; import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.dag.TwoInputNode; import org.apache.flink.compiler.dataproperties.GlobalProperties; import org.apache.flink.compiler.dataproperties.LocalProperties; @@ -81,7 +82,48 @@ public abstract class OperatorDescriptorDual implements AbstractOperatorDescript public abstract GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2); public abstract LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2); - + + protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2) { + + // check number of produced partitioning fields + if(fields1.size() != fields2.size()) { + return false; + } else { + return checkEquivalentFieldPositionsInKeyFields(fields1, fields2, fields1.size()); + } + } + + protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2, int numRelevantFields) { + + // check number of produced partitioning fields + if(fields1.size() < numRelevantFields || fields2.size() < numRelevantFields) { + return false; + } + else { + for(int i=0; i<numRelevantFields; i++) { + int pField1 = fields1.get(i); + int pField2 = fields2.get(i); + // check if position of both produced fields is the same in both requested fields + int j; + for(j=0; j<this.keys1.size(); j++) { + if(this.keys1.get(j) == pField1 && this.keys2.get(j) == pField2) { + break; + } + else if(this.keys1.get(j) != pField1 && this.keys2.get(j) != pField2) { + // do nothing + } + else { + return false; + } + } + if(j == this.keys1.size()) { + throw new CompilerException("Fields were not found in key fields."); + } + } + } + return true; + } + // -------------------------------------------------------------------------------------------- public static final class GlobalPropertiesPair { http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java index cd6094e..4ca82d5 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java @@ -68,12 +68,17 @@ public class SortMergeJoinDescriptor extends AbstractJoinDescriptor { Ordering prod1 = produced1.getOrdering(); Ordering prod2 = produced2.getOrdering(); - if (prod1 == null || prod2 == null || prod1.getNumberOfFields() < numRelevantFields || - prod2.getNumberOfFields() < prod2.getNumberOfFields()) - { + if (prod1 == null || prod2 == null) { throw new CompilerException("The given properties do not meet this operators requirements."); } - + + // check that order of fields is equivalent + if (!checkEquivalentFieldPositionsInKeyFields( + prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) { + return false; + } + + // check that both inputs have the same directions of order for (int i = 0; i < numRelevantFields; i++) { if (prod1.getOrder(i) != prod2.getOrder(i)) { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java index e3f5267..677d9be 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java @@ -211,7 +211,7 @@ public class FeedbackPropertiesMatchTest { LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2)); RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldList(2, 5)); + reqGp.setHashPartitioned(new FieldSet(2, 5)); RequestedLocalProperties reqLp = new RequestedLocalProperties(); reqLp.setGroupedFields(new FieldList(1)); @@ -375,7 +375,7 @@ public class FeedbackPropertiesMatchTest { LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2)); RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldList(2, 5)); + reqGp.setHashPartitioned(new FieldSet(2, 5)); RequestedLocalProperties reqLp = new RequestedLocalProperties(); reqLp.setGroupedFields(new FieldList(1)); @@ -397,7 +397,7 @@ public class FeedbackPropertiesMatchTest { LocalProperties lp = new LocalProperties(); RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setHashPartitioned(new FieldList(2, 5)); + reqGp.setHashPartitioned(new FieldSet(2, 5)); toMap1.setRequiredGlobalProps(null); toMap1.setRequiredLocalProps(null); @@ -434,7 +434,7 @@ public class FeedbackPropertiesMatchTest { LocalProperties lp = LocalProperties.forGrouping(new FieldList(1)); RequestedGlobalProperties reqGp = new RequestedGlobalProperties(); - reqGp.setAnyPartitioning(new FieldList(2, 5)); + reqGp.setAnyPartitioning(new FieldSet(2, 5)); RequestedLocalProperties reqLp = new RequestedLocalProperties(); reqLp.setGroupedFields(new FieldList(1)); http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java new file mode 100644 index 0000000..c1ebd7b --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java @@ -0,0 +1,859 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.base.JoinOperatorBase; +import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.compiler.dag.JoinNode; +import org.apache.flink.compiler.dataproperties.GlobalProperties; +import org.apache.flink.compiler.dataproperties.LocalProperties; +import org.apache.flink.compiler.dataproperties.PartitioningProperty; +import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties; +import org.apache.flink.compiler.plan.Channel; +import org.apache.flink.compiler.plan.DualInputPlanNode; +import org.apache.flink.compiler.plan.OptimizedPlan; +import org.apache.flink.compiler.plan.PlanNode; +import org.apache.flink.compiler.plan.SingleInputPlanNode; +import org.apache.flink.compiler.plan.SinkPlanNode; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Visitor; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@SuppressWarnings("serial") +public class PartitioningReusageTest extends CompilerTestBase { + + @Test + public void noPreviousPartitioningJoin1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(0,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + + } + + @Test + public void noPreviousPartitioningJoin2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + + } + + @Test + public void reuseSinglePartitioningJoin1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(0,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + + } + + @Test + public void reuseSinglePartitioningJoin2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseSinglePartitioningJoin3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .join(set2.partitionByHash(2, 1) + .map(new MockMapper()) + .withForwardedFields("2;1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseSinglePartitioningJoin4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseSinglePartitioningJoin5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .join(set2.partitionByHash(2) + .map(new MockMapper()) + .withForwardedFields("2"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2.partitionByHash(0,1) + .map(new MockMapper()) + .withForwardedFields("0;1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(0,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + + @Test + public void reuseBothPartitioningJoin2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .join(set2.partitionByHash(1,2) + .map(new MockMapper()) + .withForwardedFields("1;2"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .join(set2.partitionByHash(2,1) + .map(new MockMapper()) + .withForwardedFields("2;1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,1).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0,2) + .map(new MockMapper()).withForwardedFields("0;2") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(2,1).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin6() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(1,2).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + @Test + public void reuseBothPartitioningJoin7() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> joined = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .join(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1"), + JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) + .where(0,2).equalTo(1,2).with(new MockJoin()); + + joined.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource(); + + checkValidJoinInputProperties(join); + } + + + @Test + public void noPreviousPartitioningCoGroup1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .coGroup(set2) + .where(0,1).equalTo(0,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + + } + + @Test + public void noPreviousPartitioningCoGroup2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .coGroup(set2) + .where(0,1).equalTo(2,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + + } + + @Test + public void reuseSinglePartitioningCoGroup1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2) + .where(0,1).equalTo(0,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + + } + + @Test + public void reuseSinglePartitioningCoGroup2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2) + .where(0,1).equalTo(2,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseSinglePartitioningCoGroup3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .coGroup(set2.partitionByHash(2, 1) + .map(new MockMapper()) + .withForwardedFields("2;1")) + .where(0,1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseSinglePartitioningCoGroup4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .coGroup(set2) + .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseSinglePartitioningCoGroup5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .coGroup(set2.partitionByHash(2) + .map(new MockMapper()) + .withForwardedFields("2")) + .where(0,1).equalTo(2,1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2.partitionByHash(0, 1) + .map(new MockMapper()) + .withForwardedFields("0;1")) + .where(0, 1).equalTo(0, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + + @Test + public void reuseBothPartitioningCoGroup2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0,1) + .map(new MockMapper()).withForwardedFields("0;1") + .coGroup(set2.partitionByHash(1, 2) + .map(new MockMapper()) + .withForwardedFields("1;2")) + .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0) + .map(new MockMapper()).withForwardedFields("0") + .coGroup(set2.partitionByHash(2, 1) + .map(new MockMapper()) + .withForwardedFields("2;1")) + .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(0,2) + .map(new MockMapper()).withForwardedFields("0;2") + .coGroup(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1")) + .where(0, 2).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .coGroup(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1")) + .where(0, 2).equalTo(2, 1).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup6() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .coGroup(set2.partitionByHash(2) + .map(new MockMapper()) + .withForwardedFields("2")) + .where(0, 2).equalTo(1, 2).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + @Test + public void reuseBothPartitioningCoGroup7() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class); + + DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1 + .partitionByHash(2) + .map(new MockMapper()).withForwardedFields("2") + .coGroup(set2.partitionByHash(1) + .map(new MockMapper()) + .withForwardedFields("1")) + .where(0, 2).equalTo(1, 2).with(new MockCoGroup()); + + coGrouped.print(); + JavaPlan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileWithStats(plan); + + SinkPlanNode sink = oPlan.getDataSinks().iterator().next(); + DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource(); + + checkValidCoGroupInputProperties(coGroup); + } + + + + private void checkValidJoinInputProperties(DualInputPlanNode join) { + + GlobalProperties inProps1 = join.getInput1().getGlobalProperties(); + GlobalProperties inProps2 = join.getInput2().getGlobalProperties(); + + if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && + inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { + + // check that both inputs are hash partitioned on the same fields + FieldList pFields1 = inProps1.getPartitioningFields(); + FieldList pFields2 = inProps2.getPartitioningFields(); + + assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2, + pFields1.size() == pFields2.size()); + + FieldList reqPFields1 = join.getKeysForInput1(); + FieldList reqPFields2 = join.getKeysForInput2(); + + for(int i=0; i<pFields1.size(); i++) { + + // get fields + int f1 = pFields1.get(i); + int f2 = pFields2.get(i); + + // check that field positions in original key field list are identical + int pos1 = getPosInFieldList(f1, reqPFields1); + int pos2 = getPosInFieldList(f2, reqPFields2); + + if(pos1 < 0) { + fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1); + } + if(pos2 < 0) { + fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2); + } + if(pos1 != pos2) { + fail("Inputs are not partitioned on the same key fields"); + } + } + + } + else if(inProps1.getPartitioning() == PartitioningProperty.FULL_REPLICATION && + inProps2.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { + // we are good. No need to check for fields + } + else if(inProps1.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED && + inProps2.getPartitioning() == PartitioningProperty.FULL_REPLICATION) { + // we are good. No need to check for fields + } + else { + throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroupinputs"); + } + + } + + private void checkValidCoGroupInputProperties(DualInputPlanNode coGroup) { + + GlobalProperties inProps1 = coGroup.getInput1().getGlobalProperties(); + GlobalProperties inProps2 = coGroup.getInput2().getGlobalProperties(); + + if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && + inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { + + // check that both inputs are hash partitioned on the same fields + FieldList pFields1 = inProps1.getPartitioningFields(); + FieldList pFields2 = inProps2.getPartitioningFields(); + + assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2, + pFields1.size() == pFields2.size()); + + FieldList reqPFields1 = coGroup.getKeysForInput1(); + FieldList reqPFields2 = coGroup.getKeysForInput2(); + + for(int i=0; i<pFields1.size(); i++) { + + // get fields + int f1 = pFields1.get(i); + int f2 = pFields2.get(i); + + // check that field positions in original key field list are identical + int pos1 = getPosInFieldList(f1, reqPFields1); + int pos2 = getPosInFieldList(f2, reqPFields2); + + if(pos1 < 0) { + fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1); + } + if(pos2 < 0) { + fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2); + } + if(pos1 != pos2) { + fail("Inputs are not partitioned on the same key fields"); + } + } + + } + else { + throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroup inputs"); + } + + } + + private int getPosInFieldList(int field, FieldList list) { + + int pos; + for(pos=0; pos<list.size(); pos++) { + if(field == list.get(pos)) { + break; + } + } + if(pos == list.size()) { + return -1; + } else { + return pos; + } + + } + + + + public static class MockMapper implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> { + @Override + public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception { + return null; + } + } + + public static class MockJoin implements JoinFunction<Tuple3<Integer, Integer, Integer>, + Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> { + + @Override + public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception { + return null; + } + } + + public static class MockCoGroup implements CoGroupFunction<Tuple3<Integer, Integer, Integer>, + Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> { + + @Override + public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> first, Iterable<Tuple3<Integer, Integer, Integer>> second, + Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception { + + } + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java index 1890597..f6ae01a 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java @@ -42,30 +42,34 @@ public class GlobalPropertiesMatchingTest { GlobalProperties gp1 = new GlobalProperties(); gp1.setAnyPartitioning(new FieldList(2, 6)); assertTrue(req.isMetBy(gp1)); - + GlobalProperties gp2 = new GlobalProperties(); gp2.setAnyPartitioning(new FieldList(6, 2)); assertTrue(req.isMetBy(gp2)); - + GlobalProperties gp3 = new GlobalProperties(); - gp3.setAnyPartitioning(new FieldList(6, 1)); + gp3.setAnyPartitioning(new FieldList(6, 2, 4)); assertFalse(req.isMetBy(gp3)); - + GlobalProperties gp4 = new GlobalProperties(); - gp4.setAnyPartitioning(new FieldList(2)); - assertTrue(req.isMetBy(gp4)); + gp4.setAnyPartitioning(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp4)); + + GlobalProperties gp5 = new GlobalProperties(); + gp5.setAnyPartitioning(new FieldList(2)); + assertTrue(req.isMetBy(gp5)); } - + // match hash partitioning { GlobalProperties gp1 = new GlobalProperties(); gp1.setHashPartitioned(new FieldList(2, 6)); assertTrue(req.isMetBy(gp1)); - + GlobalProperties gp2 = new GlobalProperties(); gp2.setHashPartitioned(new FieldList(6, 2)); assertTrue(req.isMetBy(gp2)); - + GlobalProperties gp3 = new GlobalProperties(); gp3.setHashPartitioned(new FieldList(6, 1)); assertFalse(req.isMetBy(gp3)); @@ -154,6 +158,136 @@ public class GlobalPropertiesMatchingTest { fail(e.getMessage()); } } + + @Test + public void testStrictlyMatchingAnyPartitioning() { + + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setAnyPartitioning(new FieldList(6, 2)); + + // match any partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setAnyPartitioning(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setAnyPartitioning(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setAnyPartitioning(new FieldList(6, 2, 3)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp3.setAnyPartitioning(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp5 = new GlobalProperties(); + gp4.setAnyPartitioning(new FieldList(2)); + assertFalse(req.isMetBy(gp4)); + } + + // match hash partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setHashPartitioned(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setHashPartitioned(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setHashPartitioned(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + } + + // match range partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); + assertFalse(req.isMetBy(gp4)); + } + + } + + @Test + public void testStrictlyMatchingHashPartitioning() { + + RequestedGlobalProperties req = new RequestedGlobalProperties(); + req.setHashPartitioned(new FieldList(6, 2)); + + // match any partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setAnyPartitioning(new FieldList(6, 2)); + assertFalse(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setAnyPartitioning(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setAnyPartitioning(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setAnyPartitioning(new FieldList(2)); + assertFalse(req.isMetBy(gp4)); + } + + // match hash partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setHashPartitioned(new FieldList(6, 2)); + assertTrue(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setHashPartitioned(new FieldList(2, 6)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setHashPartitioned(new FieldList(6, 1)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setHashPartitioned(new FieldList(6, 2, 0)); + assertFalse(req.isMetBy(gp4)); + } + + // match range partitioning + { + GlobalProperties gp1 = new GlobalProperties(); + gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp1)); + + GlobalProperties gp2 = new GlobalProperties(); + gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp2)); + + GlobalProperties gp3 = new GlobalProperties(); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); + assertFalse(req.isMetBy(gp3)); + + GlobalProperties gp4 = new GlobalProperties(); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); + assertFalse(req.isMetBy(gp4)); + } + + } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java index 3f9c0db..0bb72b8 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java @@ -423,7 +423,7 @@ public class RequestedGlobalPropertiesFilteringTest { SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); RequestedGlobalProperties gprops = new RequestedGlobalProperties(); - gprops.setHashPartitioned(new FieldList(0,1)); + gprops.setHashPartitioned(new FieldSet(0,1)); gprops.filterBySemanticProperties(sprops, 1); } http://git-wip-us.apache.org/repos/asf/flink/blob/b0a57c32/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java index 1a76d49..ae0a722 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java @@ -117,7 +117,7 @@ public class FieldList extends FieldSet { public FieldList toFieldList() { return this; } - + // -------------------------------------------------------------------------------------------- @Override @@ -158,6 +158,19 @@ public class FieldList extends FieldSet { } return true; } + + public boolean isExactMatch(FieldList list) { + if (this.size() != list.size()) { + return false; + } else { + for (int i = 0; i < this.size(); i++) { + if (this.get(i) != list.get(i)) { + return false; + } + } + return true; + } + } // --------------------------------------------------------------------------------------------