[FLINK-1664] Adds check if a selected sort key is sortable This closes #541
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f36eb54e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f36eb54e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f36eb54e Branch: refs/heads/master Commit: f36eb54ee6d8cc130439def98559b6b0a70b6c7b Parents: f39aec8 Author: Fabian Hueske <fhue...@apache.org> Authored: Fri Mar 27 21:37:59 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Apr 3 20:42:05 2015 +0200 ---------------------------------------------------------------------- .../api/common/typeinfo/TypeInformation.java | 10 +- .../api/common/typeutils/CompositeType.java | 20 ++ .../flink/api/java/SortPartitionOperator.java | 30 +- .../flink/api/java/operators/DataSink.java | 28 ++ .../apache/flink/api/java/operators/Keys.java | 14 +- .../api/java/operators/SortedGrouping.java | 71 +++-- .../flink/api/java/typeutils/PojoTypeInfo.java | 7 +- .../api/java/typeutils/TupleTypeInfoBase.java | 19 -- .../flink/api/java/operator/DataSinkTest.java | 46 +-- .../flink/api/java/operator/GroupingTest.java | 278 +++++++++++++++++-- .../api/java/operator/SortPartitionTest.java | 204 ++++++++++++++ 11 files changed, 637 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java index 4fa02e3..bb50e32 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java @@ -132,7 +132,15 @@ public abstract class TypeInformation<T> implements Serializable { * @return True, if the type can be used as a key, false otherwise. */ public abstract boolean isKeyType(); - + + /** + * Checks whether this type can be used as a key for sorting. + * The order produced by sorting this type must be meaningful. + */ + public boolean isSortKeyType() { + return isKeyType(); + } + /** * Creates a serializer for the type. The serializer may use the ExecutionConfig * for parameterization. http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java index 54a1e13..de39ec8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java @@ -169,6 +169,26 @@ public abstract class CompositeType<T> extends TypeInformation<T> { return getFieldIndex(fieldName) >= 0; } + @Override + public boolean isKeyType() { + for(int i=0;i<this.getArity();i++) { + if (!this.getTypeAt(i).isKeyType()) { + return false; + } + } + return true; + } + + @Override + public boolean isSortKeyType() { + for(int i=0;i<this.getArity();i++) { + if (!this.getTypeAt(i).isSortKeyType()) { + return false; + } + } + return true; + } + /** * Returns the names of the composite fields of this type. The order of the returned array must * be consistent with the internal field index ordering. http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java index c8f8bbc..988144b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java @@ -24,9 +24,11 @@ import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.operators.SingleInputOperator; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import java.util.Arrays; @@ -96,11 +98,23 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart private int[] getFlatFields(int field) { + if(!(super.getType() instanceof TupleTypeInfoBase<?>)) { + throw new InvalidProgramException("Field positions can only be specified on Tuple or " + + "Case Class types."); + } + else { + // check selected field is sortable type + TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) super.getType()).getTypeAt(field); + if (!sortKeyType.isSortKeyType()) { + throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); + } + } + Keys.ExpressionKeys<T> ek; try { ek = new Keys.ExpressionKeys<T>(new int[]{field}, super.getType()); } catch(IllegalArgumentException iae) { - throw new InvalidProgramException("Invalid specification of field expression.", iae); + throw new InvalidProgramException("Invalid specification of field position.", iae); } return ek.computeLogicalKeyPositions(); } @@ -108,6 +122,13 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart private int[] getFlatFields(String fields) { if(super.getType() instanceof CompositeType) { + + // check selected field is sortable type + TypeInformation<?> sortKeyType = ((CompositeType<?>) super.getType()).getTypeAt(fields); + if (!sortKeyType.isSortKeyType()) { + throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); + } + // compute flat field positions for (nested) sorting fields Keys.ExpressionKeys<T> ek; try { @@ -123,6 +144,12 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart throw new InvalidProgramException("Output sorting of non-composite types can only be defined on the full type. " + "Use a field wildcard for that (\"*\" or \"_\")"); } else { + + // check if selected field is sortable type + if (!super.getType().isSortKeyType()) { + throw new InvalidProgramException("Selected sort key cannot be sorted: " + super.getType()); + } + return new int[]{0}; } } @@ -149,7 +176,6 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart } } - // -------------------------------------------------------------------------------------------- // Translation // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java index 83ec021..5b5b031 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.typeinfo.NothingTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.configuration.Configuration; import org.apache.flink.types.Nothing; import org.apache.flink.api.java.DataSet; @@ -114,6 +115,7 @@ public class DataSink<T> { if (field >= this.type.getArity()) { throw new InvalidProgramException("Order key out of tuple bounds."); } + isValidSortKeyType(field); // get flat keys Keys.ExpressionKeys<T> ek; @@ -166,9 +168,11 @@ public class DataSink<T> { Order[] orders; if(this.type instanceof CompositeType) { + // compute flat field positions for (nested) sorting fields Keys.ExpressionKeys<T> ek; try { + isValidSortKeyType(fieldExpression); ek = new Keys.ExpressionKeys<T>(new String[]{fieldExpression}, this.type); } catch(IllegalArgumentException iae) { throw new InvalidProgramException("Invalid specification of field expression.", iae); @@ -183,6 +187,8 @@ public class DataSink<T> { throw new InvalidProgramException("Output sorting of non-composite types can only be defined on the full type. " + "Use a field wildcard for that (\"*\" or \"_\")"); } else { + isValidSortKeyType(fieldExpression); + numFields = 1; fields = new int[]{0}; orders = new Order[]{order}; @@ -208,6 +214,28 @@ public class DataSink<T> { return this; } + private void isValidSortKeyType(int field) { + TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) this.type).getTypeAt(field); + if (!sortKeyType.isSortKeyType()) { + throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); + } + } + + private void isValidSortKeyType(String field) { + TypeInformation<?> sortKeyType; + + field = field.trim(); + if(field.equals("*") || field.equals("_")) { + sortKeyType = this.type; + } else { + sortKeyType = ((CompositeType<?>) this.type).getTypeAt(field); + } + + if (!sortKeyType.isSortKeyType()) { + throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); + } + } + /** * @return Configuration for the OutputFormat. */ http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java index 2c067fd..a2cde07 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java @@ -82,19 +82,19 @@ public abstract class Keys<T> { this.keyExtractor = keyExtractor; this.keyType = keyType; - + + if(!keyType.isKeyType()) { + throw new InvalidProgramException("Return type "+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key type"); + } + // we have to handle a special case here: - // if the keyType is a tuple type, we need to select the full tuple with all its fields. - if(keyType.isTupleType()) { + // if the keyType is a composite type, we need to select the full type with all its fields. + if(keyType instanceof CompositeType) { ExpressionKeys<K> ek = new ExpressionKeys<K>(new String[] {ExpressionKeys.SELECT_ALL_CHAR}, keyType); logicalKeyFields = ek.computeLogicalKeyPositions(); } else { logicalKeyFields = new int[] {0}; } - - if (!this.keyType.isKeyType()) { - throw new IllegalArgumentException("Invalid type of KeySelector keys"); - } } public TypeInformation<K> getKeyType() { http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index 287bf82..4c6c952 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.operators.Keys.ExpressionKeys; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.api.java.typeutils.TypeExtractor; import com.google.common.base.Preconditions; @@ -63,6 +64,8 @@ public class SortedGrouping<T> extends Grouping<T> { if (field >= dataSet.getType().getArity()) { throw new IllegalArgumentException("Order key out of tuple bounds."); } + isValidSortKeyType(field); + // use int-based expression key to properly resolve nested tuples for grouping ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType()); this.groupSortKeyPositions = ek.computeLogicalKeyPositions(); @@ -79,6 +82,8 @@ public class SortedGrouping<T> extends Grouping<T> { if (!(dataSet.getType() instanceof CompositeType)) { throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)"); } + isValidSortKeyType(field); + // resolve String-field to int using the expression keys ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType()); this.groupSortKeyPositions = ek.computeLogicalKeyPositions(); @@ -95,6 +100,10 @@ public class SortedGrouping<T> extends Grouping<T> { if (!(this.keys instanceof Keys.SelectorFunctionKeys)) { throw new InvalidProgramException("Sorting on KeySelector keys only works with KeySelector grouping."); } + TypeInformation<?> sortKeyType = keySelector.getKeyType(); + if(!sortKeyType.isSortKeyType()) { + throw new InvalidProgramException("Key type " + sortKeyType +" is not sortable."); + } this.groupSortKeyPositions = keySelector.computeLogicalKeyPositions(); for (int i = 0; i < groupSortKeyPositions.length; i++) { @@ -218,35 +227,22 @@ public class SortedGrouping<T> extends Grouping<T> { if (field >= dataSet.getType().getArity()) { throw new IllegalArgumentException("Order key out of tuple bounds."); } + isValidSortKeyType(field); + ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType()); addSortGroupInternal(ek, order); return this; } - - private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) { - Preconditions.checkArgument(order != null, "Order can not be null"); - int[] additionalKeyPositions = ek.computeLogicalKeyPositions(); - - int newLength = this.groupSortKeyPositions.length + additionalKeyPositions.length; - this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength); - this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength); - int pos = newLength - additionalKeyPositions.length; - int off = newLength - additionalKeyPositions.length; - for(;pos < newLength; pos++) { - this.groupSortKeyPositions[pos] = additionalKeyPositions[pos - off]; - this.groupSortOrders[pos] = order; // use the same order - } - } - + /** * Sorts {@link org.apache.flink.api.java.tuple.Tuple} or POJO elements within a group on the specified field in the specified {@link Order}.</br> * <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br/> * Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls. - * + * * @param field The Tuple or Pojo field on which the group is sorted. * @param order The Order in which the specified field is sorted. * @return A SortedGrouping with specified order of group element. - * + * * @see org.apache.flink.api.java.tuple.Tuple * @see Order */ @@ -257,9 +253,48 @@ public class SortedGrouping<T> extends Grouping<T> { if (! (dataSet.getType() instanceof CompositeType)) { throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)"); } + isValidSortKeyType(field); + ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType()); addSortGroupInternal(ek, order); return this; } + + private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) { + Preconditions.checkArgument(order != null, "Order can not be null"); + int[] additionalKeyPositions = ek.computeLogicalKeyPositions(); + + int newLength = this.groupSortKeyPositions.length + additionalKeyPositions.length; + this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength); + this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength); + int pos = newLength - additionalKeyPositions.length; + int off = newLength - additionalKeyPositions.length; + for(;pos < newLength; pos++) { + this.groupSortKeyPositions[pos] = additionalKeyPositions[pos - off]; + this.groupSortOrders[pos] = order; // use the same order + } + } + + private void isValidSortKeyType(int field) { + TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) dataSet.getType()).getTypeAt(field); + if (!sortKeyType.isSortKeyType()) { + throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); + } + } + + private void isValidSortKeyType(String field) { + TypeInformation<?> sortKeyType; + + field = field.trim(); + if(field.equals("*") || field.equals("_")) { + sortKeyType = this.getDataSet().getType(); + } else { + sortKeyType = ((CompositeType<?>) this.getDataSet().getType()).getTypeAt(field); + } + + if (!sortKeyType.isSortKeyType()) { + throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 1dcee24..2f3db7c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -118,8 +118,11 @@ public class PojoTypeInfo<T> extends CompositeType<T> { } @Override - public boolean isKeyType() { - return Comparable.class.isAssignableFrom(typeClass); + public boolean isSortKeyType() { + // Support for sorting POJOs that implement Comparable is not implemented yet. + // Since the order of fields in a POJO type is not well defined, sorting on fields + // gives only some undefined order. + return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index d1c2c9d..5051449 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -223,11 +223,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { } @Override - public boolean isKeyType() { - return isValidKeyType(this); - } - - @Override public boolean equals(Object obj) { if (obj instanceof TupleTypeInfoBase) { @SuppressWarnings("unchecked") @@ -245,20 +240,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { return this.types.hashCode() ^ Arrays.deepHashCode(this.types); } - private boolean isValidKeyType(TypeInformation<?> typeInfo) { - if(typeInfo instanceof TupleTypeInfoBase) { - TupleTypeInfoBase<?> tupleType = ((TupleTypeInfoBase<?>)typeInfo); - for(int i=0;i<tupleType.getArity();i++) { - if (!isValidKeyType(tupleType.getTypeAt(i))) { - return false; - } - } - return true; - } else { - return typeInfo.isKeyType(); - } - } - @Override public String toString() { StringBuilder bld = new StringBuilder("Tuple"); http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java index 7a7ed14..37ad381 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java @@ -256,23 +256,6 @@ public class DataSinkTest { } @Test - public void testPojoSingleOrderFull() { - - final ExecutionEnvironment env = ExecutionEnvironment - .getExecutionEnvironment(); - DataSet<CustomType> pojoDs = env - .fromCollection(pojoData); - - // should work - try { - pojoDs.writeAsText("/tmp/willNotHappen") - .sortLocalOutput("*", Order.ASCENDING); - } catch (Exception e) { - Assert.fail(); - } - } - - @Test public void testPojoTwoOrder() { final ExecutionEnvironment env = ExecutionEnvironment @@ -317,6 +300,35 @@ public class DataSinkTest { .sortLocalOutput("notThere", Order.DESCENDING); } + @Test(expected = InvalidProgramException.class) + public void testPojoSingleOrderFull() { + + final ExecutionEnvironment env = ExecutionEnvironment + .getExecutionEnvironment(); + DataSet<CustomType> pojoDs = env + .fromCollection(pojoData); + + // must not work + pojoDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("*", Order.ASCENDING); + } + + @Test(expected = InvalidProgramException.class) + public void testArrayOrderFull() { + + List<Object[]> arrayData = new ArrayList<Object[]>(); + arrayData.add(new Object[0]); + + final ExecutionEnvironment env = ExecutionEnvironment + .getExecutionEnvironment(); + DataSet<Object[]> pojoDs = env + .fromCollection(arrayData); + + // must not work + pojoDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("*", Order.ASCENDING); + } + /** * Custom data type, for testing purposes. */ http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java index c958680..314695f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java @@ -24,13 +24,16 @@ import java.util.List; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.junit.Assert; import org.junit.Test; @@ -48,11 +51,23 @@ public class GroupingTest { BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO ); - + + private final TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomInfo = new + TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + TypeExtractor.createTypeInfo(CustomType.class), + BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO + ); + // LONG DATA private final List<Long> emptyLongData = new ArrayList<Long>(); private final List<CustomType> customTypeData = new ArrayList<CustomType>(); + + private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData = + new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>(); + @Test public void testGroupByKeyFields1() { @@ -187,7 +202,6 @@ public class GroupingTest { // should not work, key out of tuple bounds ds.groupBy("nested.myNonExistent"); } - @Test @SuppressWarnings("serial") @@ -233,41 +247,67 @@ public class GroupingTest { Assert.fail(); } } - - @Test(expected=IllegalArgumentException.class) + + @Test @SuppressWarnings("serial") public void testGroupByKeySelector3() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); this.customTypeData.add(new CustomType()); - - DataSet<CustomType> customDs = env.fromCollection(customTypeData); - // should not work - customDs.groupBy( - new KeySelector<GroupingTest.CustomType, CustomType>() { - @Override - public CustomType getKey(CustomType value) { - return value; - } - }); + + try { + DataSet<CustomType> customDs = env.fromCollection(customTypeData); + // should not work + customDs.groupBy( + new KeySelector<GroupingTest.CustomType, CustomType>() { + @Override + public CustomType getKey(CustomType value) { + return value; + } + }); + } catch(Exception e) { + Assert.fail(); + } } - - @Test(expected=IllegalArgumentException.class) + + @Test @SuppressWarnings("serial") public void testGroupByKeySelector4() { - + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); this.customTypeData.add(new CustomType()); - + + try { + DataSet<CustomType> customDs = env.fromCollection(customTypeData); + // should not work + customDs.groupBy( + new KeySelector<GroupingTest.CustomType, Tuple2<Integer, GroupingTest.CustomType>>() { + @Override + public Tuple2<Integer, CustomType> getKey(CustomType value) { + return new Tuple2<Integer, CustomType>(value.myInt, value); + } + }); + } catch(Exception e) { + Assert.fail(); + } + } + + @Test(expected = InvalidProgramException.class) + @SuppressWarnings("serial") + public void testGroupByKeySelector5() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + this.customTypeData.add(new CustomType()); + DataSet<CustomType> customDs = env.fromCollection(customTypeData); // should not work customDs.groupBy( - new KeySelector<GroupingTest.CustomType, Tuple2<Integer, GroupingTest.CustomType>>() { + new KeySelector<GroupingTest.CustomType, CustomType2>() { @Override - public Tuple2<Integer, CustomType> getKey(CustomType value) { - return new Tuple2<Integer, CustomType>(value.myInt, value); - } - }); + public CustomType2 getKey(CustomType value) { + return new CustomType2(); + } + }); } @Test @@ -313,6 +353,30 @@ public class GroupingTest { }).sortGroup(0, Order.ASCENDING); } + + @Test(expected = InvalidProgramException.class) + public void testGroupSortKeyFields4() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should not work + tupleDs.groupBy(0) + .sortGroup(2, Order.ASCENDING); + } + + @Test(expected = InvalidProgramException.class) + public void testGroupSortKeyFields5() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should not work + tupleDs.groupBy(0) + .sortGroup(3, Order.ASCENDING); + } @Test public void testChainedGroupSortKeyFields() { @@ -327,7 +391,166 @@ public class GroupingTest { Assert.fail(); } } - + + @Test + public void testGroupSortByKeyExpression1() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should work + try { + tupleDs.groupBy("f0").sortGroup("f1", Order.ASCENDING); + } catch(Exception e) { + Assert.fail(); + } + } + + @Test + public void testGroupSortByKeyExpression2() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should work + try { + tupleDs.groupBy("f0").sortGroup("f2.myString", Order.ASCENDING); + } catch(Exception e) { + Assert.fail(); + } + } + + @Test + public void testGroupSortByKeyExpression3() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should work + try { + tupleDs.groupBy("f0") + .sortGroup("f2.myString", Order.ASCENDING) + .sortGroup("f1", Order.DESCENDING); + } catch(Exception e) { + Assert.fail(); + } + } + + @Test(expected = InvalidProgramException.class) + public void testGroupSortByKeyExpression4() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should not work + tupleDs.groupBy("f0") + .sortGroup("f2", Order.ASCENDING); + } + + @Test(expected = InvalidProgramException.class) + public void testGroupSortByKeyExpression5() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should not work + tupleDs.groupBy("f0") + .sortGroup("f1", Order.ASCENDING) + .sortGroup("f2", Order.ASCENDING); + } + + @Test(expected = InvalidProgramException.class) + public void testGroupSortByKeyExpression6() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should not work + tupleDs.groupBy("f0") + .sortGroup("f3", Order.ASCENDING); + } + + @SuppressWarnings("serial") + @Test + public void testGroupSortByKeySelector1() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should not work + tupleDs.groupBy( + new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() { + @Override + public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception { + return value.f1; + } + }) + .sortGroup( + new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Integer>() { + @Override + public Integer getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception { + return value.f0; + } + }, Order.ASCENDING); + } + + @SuppressWarnings("serial") + @Test(expected = InvalidProgramException.class) + public void testGroupSortByKeySelector2() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should not work + tupleDs.groupBy( + new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() { + @Override + public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception { + return value.f1; + } + }) + .sortGroup( + new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, CustomType>() { + @Override + public CustomType getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception { + return value.f2; + } + }, Order.ASCENDING); + } + + @SuppressWarnings("serial") + @Test(expected = InvalidProgramException.class) + public void testGroupSortByKeySelector3() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = + env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should not work + tupleDs.groupBy( + new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() { + @Override + public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception { + return value.f1; + } + }) + .sortGroup( + new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Long[]>() { + @Override + public Long[] getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception { + return value.f3; + } + }, Order.ASCENDING); + } + public static class CustomType implements Serializable { @@ -354,4 +577,11 @@ public class GroupingTest { return myInt+","+myLong+","+myString; } } + + public static class CustomType2 implements Serializable { + + public int myInt; + public int[] myIntArray; + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java new file mode 100644 index 0000000..a4e2bbc --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operator; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class SortPartitionTest { + + // TUPLE DATA + private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = + new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>(); + + private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new + TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO + ); + + private final TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomInfo = new + TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + TypeExtractor.createTypeInfo(CustomType.class), + BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO + ); + + // LONG DATA + private final List<Long> emptyLongData = new ArrayList<Long>(); + + private final List<CustomType> customTypeData = new ArrayList<CustomType>(); + + private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData = + new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>(); + + + @Test + public void testSortPartitionPositionKeys1() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + // should work + try { + tupleDs.sortPartition(0, Order.ASCENDING); + } catch(Exception e) { + Assert.fail(); + } + } + + @Test + public void testSortPartitionPositionKeys2() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + // should work + try { + tupleDs + .sortPartition(0, Order.ASCENDING) + .sortPartition(3, Order.DESCENDING); + } catch(Exception e) { + Assert.fail(); + } + } + + @Test(expected = InvalidProgramException.class) + public void testSortPartitionWithPositionKeys3() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // must not work + tupleDs.sortPartition(2, Order.ASCENDING); + } + + @Test(expected = InvalidProgramException.class) + public void testSortPartitionWithPositionKeys4() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // must not work + tupleDs.sortPartition(3, Order.ASCENDING); + } + + @Test + public void testSortPartitionExpressionKeys1() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + // should work + try { + tupleDs.sortPartition("f1", Order.ASCENDING); + } catch(Exception e) { + Assert.fail(); + } + } + + @Test + public void testSortPartitionExpressionKeys2() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should work + try { + tupleDs + .sortPartition("f0", Order.ASCENDING) + .sortPartition("f2.nested.myInt", Order.DESCENDING); + } catch(Exception e) { + Assert.fail(); + } + } + + @Test(expected = InvalidProgramException.class) + public void testSortPartitionWithExpressionKeys3() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // must not work + tupleDs.sortPartition("f2.nested", Order.ASCENDING); + } + + @Test(expected = InvalidProgramException.class) + public void testSortPartitionWithExpressionKeys4() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // must not work + tupleDs.sortPartition("f3", Order.ASCENDING); + } + + public static class CustomType implements Serializable { + + public static class Nest { + public int myInt; + } + private static final long serialVersionUID = 1L; + + public int myInt; + public long myLong; + public String myString; + public Nest nested; + + public CustomType() {}; + + public CustomType(int i, long l, String s) { + myInt = i; + myLong = l; + myString = s; + } + + @Override + public String toString() { + return myInt+","+myLong+","+myString; + } + } + + public static class CustomType2 implements Serializable { + + public int myInt; + public int[] myIntArray; + + } +}