[FLINK-1666][FLINK-1903][FLINK-3233] Refactor expression keys. - Unify usage of expression keys: FLINK-1666 - Unify supported key expressions (incl. support for partitioning on atomic types: FLINK-3233) - Remove removal of duplicate keys: FLINK-1903 - Unify checks for sort keys - Add more tests for SelectorFunctionKeys and ExpressionKeys
This closes #1520 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af029e7e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af029e7e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af029e7e Branch: refs/heads/master Commit: af029e7e0c6377ec12edafedb4d6ea53c4fa9fe9 Parents: 9840136 Author: Fabian Hueske <fhue...@apache.org> Authored: Thu Jan 14 22:48:43 2016 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Jan 26 11:16:58 2016 +0100 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/DataSet.java | 12 +- .../flink/api/java/io/SplitDataProperties.java | 52 +- .../api/java/operators/CoGroupOperator.java | 31 +- .../flink/api/java/operators/DataSink.java | 77 +-- .../api/java/operators/DistinctOperator.java | 2 +- .../apache/flink/api/java/operators/Keys.java | 444 ++++++++--------- .../api/java/operators/PartitionOperator.java | 5 - .../java/operators/SortPartitionOperator.java | 62 +-- .../api/java/operators/SortedGrouping.java | 70 +-- .../api/java/operator/CoGroupOperatorTest.java | 8 +- .../flink/api/java/operator/DataSinkTest.java | 79 +-- .../api/java/operator/DistinctOperatorTest.java | 2 +- .../operator/FullOuterJoinOperatorTest.java | 2 +- .../flink/api/java/operator/GroupingTest.java | 6 +- .../api/java/operator/JoinOperatorTest.java | 8 +- .../operator/LeftOuterJoinOperatorTest.java | 2 +- .../operator/RightOuterJoinOperatorTest.java | 2 +- .../api/java/operators/ExpressionKeysTest.java | 479 +++++++++++++++++++ .../flink/api/java/operators/KeysTest.java | 284 ----------- .../operators/SelectorFunctionKeysTest.java | 153 ++++++ .../api/java/table/JavaBatchTranslator.scala | 4 +- .../table/typeinfo/RenamingProxyTypeInfo.scala | 21 +- .../apache/flink/api/scala/CoGroupDataSet.scala | 17 +- .../org/apache/flink/api/scala/DataSet.scala | 14 +- .../apache/flink/api/scala/GroupedDataSet.scala | 18 +- .../test/javaApiOperators/DataSinkITCase.java | 54 ++- .../test/javaApiOperators/PartitionITCase.java | 79 ++- .../scala/operators/CoGroupOperatorTest.scala | 8 +- .../scala/operators/DistinctOperatorTest.scala | 2 +- .../api/scala/operators/GroupingTest.scala | 6 +- .../api/scala/operators/JoinOperatorTest.scala | 8 +- 31 files changed, 1142 insertions(+), 869 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index be84032..5633185 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -601,7 +601,7 @@ public abstract class DataSet<T> { * @return A DistinctOperator that represents the distinct DataSet. */ public DistinctOperator<T> distinct(int... fields) { - return new DistinctOperator<>(this, new Keys.ExpressionKeys<>(fields, getType(), true), Utils.getCallLocationName()); + return new DistinctOperator<>(this, new Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName()); } /** @@ -686,7 +686,7 @@ public abstract class DataSet<T> { * @see DataSet */ public UnsortedGrouping<T> groupBy(int... fields) { - return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType(), false)); + return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType())); } /** @@ -1165,7 +1165,7 @@ public abstract class DataSet<T> { Preconditions.checkNotNull(workset); Preconditions.checkNotNull(keyPositions); - Keys.ExpressionKeys<T> keys = new Keys.ExpressionKeys<>(keyPositions, getType(), false); + Keys.ExpressionKeys<T> keys = new Keys.ExpressionKeys<>(keyPositions, getType()); return new DeltaIteration<>(getExecutionEnvironment(), getType(), this, workset, keys, maxIterations); } @@ -1214,7 +1214,7 @@ public abstract class DataSet<T> { * @return The partitioned DataSet. */ public PartitionOperator<T> partitionByHash(int... fields) { - return new PartitionOperator<>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<>(fields, getType(), false), Utils.getCallLocationName()); + return new PartitionOperator<>(this, PartitionMethod.HASH, new Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName()); } /** @@ -1254,7 +1254,7 @@ public abstract class DataSet<T> { * @return The partitioned DataSet. */ public PartitionOperator<T> partitionByRange(int... fields) { - return new PartitionOperator<>(this, PartitionMethod.RANGE, new Keys.ExpressionKeys<>(fields, getType(), false), Utils.getCallLocationName()); + return new PartitionOperator<>(this, PartitionMethod.RANGE, new Keys.ExpressionKeys<>(fields, getType()), Utils.getCallLocationName()); } /** @@ -1297,7 +1297,7 @@ public abstract class DataSet<T> { * @return The partitioned DataSet. */ public <K> PartitionOperator<T> partitionCustom(Partitioner<K> partitioner, int field) { - return new PartitionOperator<>(this, new Keys.ExpressionKeys<>(new int[] {field}, getType(), false), clean(partitioner), Utils.getCallLocationName()); + return new PartitionOperator<>(this, new Keys.ExpressionKeys<>(new int[] {field}, getType()), clean(partitioner), Utils.getCallLocationName()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java index 56d2a64..c44929f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.Keys; @@ -124,7 +123,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr this.splitPartitionKeys = getAllFlatKeys(partitionFields); if (partitionMethodId != null) { - this.splitPartitioner = new SourcePartitionerMarker<T>(partitionMethodId); + this.splitPartitioner = new SourcePartitionerMarker<>(partitionMethodId); } else { this.splitPartitioner = null; } @@ -175,7 +174,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr this.splitPartitionKeys = getAllFlatKeys(partitionKeysA); if(partitionMethodId != null) { - this.splitPartitioner = new SourcePartitionerMarker<T>(partitionMethodId); + this.splitPartitioner = new SourcePartitionerMarker<>(partitionMethodId); } else { this.splitPartitioner = null; @@ -331,7 +330,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr for(int i=0; i<orderKeysA.length; i++) { String keyExp = orderKeysA[i]; - int[] flatKeys = this.computeFlatKeys(keyExp); + Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(keyExp, this.type); + int[] flatKeys = ek.computeLogicalKeyPositions(); for(int key : flatKeys) { // check for duplicates @@ -371,7 +371,9 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr int[] allKeys = null; for(String keyExp : fieldExpressions) { - int[] flatKeys = this.computeFlatKeys(keyExp); + Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(keyExp, this.type); + int[] flatKeys = ek.computeLogicalKeyPositions(); + if(allKeys == null) { allKeys = flatKeys; } else { @@ -387,9 +389,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr int oldLength = allKeys.length; int newLength = oldLength + flatKeys.length; allKeys = Arrays.copyOf(allKeys, newLength); - for(int i=0;i<flatKeys.length; i++) { - allKeys[oldLength+i] = flatKeys[i]; - } + System.arraycopy(flatKeys, 0, allKeys, oldLength, flatKeys.length); } } @@ -397,47 +397,17 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr } private int[] getAllFlatKeys(int[] fieldPositions) { - - Keys.ExpressionKeys<T> ek; - try { - ek = new Keys.ExpressionKeys<T>(fieldPositions, this.type); - } catch(IllegalArgumentException iae) { - throw new InvalidProgramException("Invalid specification of field expression.", iae); - } + Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(fieldPositions, this.type); return ek.computeLogicalKeyPositions(); } - - private int[] computeFlatKeys(String fieldExpression) { - - fieldExpression = fieldExpression.trim(); - - if(this.type instanceof CompositeType) { - // compute flat field positions for (nested) sorting fields - Keys.ExpressionKeys<T> ek; - try { - ek = new Keys.ExpressionKeys<T>(new String[]{fieldExpression}, this.type); - } catch(IllegalArgumentException iae) { - throw new InvalidProgramException("Invalid specification of field expression.", iae); - } - return ek.computeLogicalKeyPositions(); - } else { - fieldExpression = fieldExpression.trim(); - if (!(fieldExpression.equals("*") || fieldExpression.equals("_"))) { - throw new InvalidProgramException("Data properties on non-composite types can only be defined on the full type. " + - "Use a field wildcard for that (\"*\" or \"_\")"); - } else { - return new int[]{0}; - } - } - } - /** * A custom partitioner to mark compatible split partitionings. * * @param <T> The type of the partitioned data. */ public static class SourcePartitionerMarker<T> implements Partitioner<T> { + private static final long serialVersionUID = -8554223652384442571L; String partitionMarker; @@ -454,7 +424,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr @Override public boolean equals(Object o) { if(o instanceof SourcePartitionerMarker) { - return this.partitionMarker.equals(((SourcePartitionerMarker) o).partitionMarker); + return this.partitionMarker.equals(((SourcePartitionerMarker<?>) o).partitionMarker); } else { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java index ca41fc5..845deb4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java @@ -35,7 +35,6 @@ import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.SemanticPropUtil; @@ -633,13 +632,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @see Order */ public CoGroupOperatorWithoutFunction sortFirstGroup(int field, Order order) { - if (!input1.getType().isTupleType()) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types"); - } - if (field >= input1.getType().getArity()) { - throw new IllegalArgumentException("Order key out of tuple bounds."); - } - ExpressionKeys<I1> ek = new ExpressionKeys<>(new int[]{field}, input1.getType()); + + ExpressionKeys<I1> ek = new ExpressionKeys<>(field, input1.getType()); int[] groupOrderKeys = ek.computeLogicalKeyPositions(); for (int key : groupOrderKeys) { @@ -663,13 +657,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @see Order */ public CoGroupOperatorWithoutFunction sortSecondGroup(int field, Order order) { - if (!input2.getType().isTupleType()) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types"); - } - if (field >= input2.getType().getArity()) { - throw new IllegalArgumentException("Order key out of tuple bounds."); - } - ExpressionKeys<I2> ek = new ExpressionKeys<>(new int[]{field}, input2.getType()); + + ExpressionKeys<I2> ek = new ExpressionKeys<>(field, input2.getType()); int[] groupOrderKeys = ek.computeLogicalKeyPositions(); for (int key : groupOrderKeys) { @@ -691,10 +680,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @see Order */ public CoGroupOperatorWithoutFunction sortFirstGroup(String fieldExpression, Order order) { - if (! (input1.getType() instanceof CompositeType)) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)"); - } - ExpressionKeys<I1> ek = new ExpressionKeys<>(new String[]{fieldExpression}, input1.getType()); + + ExpressionKeys<I1> ek = new ExpressionKeys<>(fieldExpression, input1.getType()); int[] groupOrderKeys = ek.computeLogicalKeyPositions(); for (int key : groupOrderKeys) { @@ -716,10 +703,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @see Order */ public CoGroupOperatorWithoutFunction sortSecondGroup(String fieldExpression, Order order) { - if (! (input2.getType() instanceof CompositeType)) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)"); - } - ExpressionKeys<I2> ek = new ExpressionKeys<>(new String[]{fieldExpression}, input2.getType()); + + ExpressionKeys<I2> ek = new ExpressionKeys<>(fieldExpression, input2.getType()); int[] groupOrderKeys = ek.computeLogicalKeyPositions(); for (int key : groupOrderKeys) { http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java index 3371665..915a053 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java @@ -27,8 +27,6 @@ import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.typeinfo.NothingTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.java.DataSet; @@ -109,23 +107,14 @@ public class DataSink<T> { @Deprecated public DataSink<T> sortLocalOutput(int field, Order order) { - if (!this.type.isTupleType()) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types"); - } - if (field >= this.type.getArity()) { - throw new InvalidProgramException("Order key out of tuple bounds."); - } - isValidSortKeyType(field); - // get flat keys - Keys.ExpressionKeys<T> ek; - try { - ek = new Keys.ExpressionKeys<>(new int[]{field}, this.type); - } catch(IllegalArgumentException iae) { - throw new InvalidProgramException("Invalid specification of field expression.", iae); - } + Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(field, this.type); int[] flatKeys = ek.computeLogicalKeyPositions(); + if (!Keys.ExpressionKeys.isSortKey(field, this.type)) { + throw new InvalidProgramException("Selected sort key is not a sortable type"); + } + if(this.sortKeyPositions == null) { // set sorting info this.sortKeyPositions = flatKeys; @@ -168,34 +157,18 @@ public class DataSink<T> { int[] fields; Order[] orders; - if(this.type instanceof CompositeType) { + // compute flat field positions for (nested) sorting fields + Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(fieldExpression, this.type); + fields = ek.computeLogicalKeyPositions(); - // compute flat field positions for (nested) sorting fields - Keys.ExpressionKeys<T> ek; - try { - isValidSortKeyType(fieldExpression); - ek = new Keys.ExpressionKeys<>(new String[]{fieldExpression}, this.type); - } catch(IllegalArgumentException iae) { - throw new InvalidProgramException("Invalid specification of field expression.", iae); - } - fields = ek.computeLogicalKeyPositions(); - numFields = fields.length; - orders = new Order[numFields]; - Arrays.fill(orders, order); - } else { - fieldExpression = fieldExpression.trim(); - if (!(fieldExpression.equals("*") || fieldExpression.equals("_"))) { - throw new InvalidProgramException("Output sorting of non-composite types can only be defined on the full type. " + - "Use a field wildcard for that (\"*\" or \"_\")"); - } else { - isValidSortKeyType(fieldExpression); - - numFields = 1; - fields = new int[]{0}; - orders = new Order[]{order}; - } + if (!Keys.ExpressionKeys.isSortKey(fieldExpression, this.type)) { + throw new InvalidProgramException("Selected sort key is not a sortable type"); } + numFields = fields.length; + orders = new Order[numFields]; + Arrays.fill(orders, order); + if(this.sortKeyPositions == null) { // set sorting info this.sortKeyPositions = fields; @@ -215,28 +188,6 @@ public class DataSink<T> { return this; } - private void isValidSortKeyType(int field) { - TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) this.type).getTypeAt(field); - if (!sortKeyType.isSortKeyType()) { - throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); - } - } - - private void isValidSortKeyType(String field) { - TypeInformation<?> sortKeyType; - - field = field.trim(); - if(field.equals("*") || field.equals("_")) { - sortKeyType = this.type; - } else { - sortKeyType = ((CompositeType<?>) this.type).getTypeAt(field); - } - - if (!sortKeyType.isSortKeyType()) { - throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); - } - } - /** * @return Configuration for the OutputFormat. */ http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index d1d208a..d85c9a6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -50,7 +50,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera // if keys is null distinction is done on all fields if (keys == null) { - keys = new Keys.ExpressionKeys<>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType()); + keys = new Keys.ExpressionKeys<>(input.getType()); } this.keys = keys; http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java index 95ca300..5992f0b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java @@ -19,8 +19,6 @@ package org.apache.flink.api.java.operators; import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; import java.util.List; import com.google.common.base.Joiner; @@ -31,7 +29,6 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; @@ -43,34 +40,44 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; - public abstract class Keys<T> { - private static final Logger LOG = LoggerFactory.getLogger(Keys.class); public abstract int getNumberOfKeyFields(); + public abstract int[] computeLogicalKeyPositions(); + + protected abstract TypeInformation<?>[] getKeyFieldTypes(); + + public abstract <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo); + public boolean isEmpty() { return getNumberOfKeyFields() == 0; } - + /** * Check if two sets of keys are compatible to each other (matching types, key counts) */ - public abstract boolean areCompatible(Keys<?> other) throws IncompatibleKeysException; - - public abstract int[] computeLogicalKeyPositions(); - - public abstract <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo); - - + public boolean areCompatible(Keys<?> other) throws IncompatibleKeysException { + + TypeInformation<?>[] thisKeyFieldTypes = this.getKeyFieldTypes(); + TypeInformation<?>[] otherKeyFieldTypes = other.getKeyFieldTypes(); + + if (thisKeyFieldTypes.length != otherKeyFieldTypes.length) { + throw new IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE); + } else { + for (int i = 0; i < thisKeyFieldTypes.length; i++) { + if (!thisKeyFieldTypes[i].equals(otherKeyFieldTypes[i])) { + throw new IncompatibleKeysException(thisKeyFieldTypes[i], otherKeyFieldTypes[i] ); + } + } + } + return true; + } + // -------------------------------------------------------------------------------------------- // Specializations for expression-based / extractor-based grouping // -------------------------------------------------------------------------------------------- @@ -81,31 +88,30 @@ public abstract class Keys<T> { private final KeySelector<T, K> keyExtractor; private final TypeInformation<T> inputType; private final TypeInformation<K> keyType; - private final int[] logicalKeyFields; + private final List<FlatFieldDescriptor> keyFields; public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> inputType, TypeInformation<K> keyType) { + if (keyExtractor == null) { throw new NullPointerException("Key extractor must not be null."); } if (keyType == null) { throw new NullPointerException("Key type must not be null."); } + if (!keyType.isKeyType()) { + throw new InvalidProgramException("Return type "+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key type"); + } this.keyExtractor = keyExtractor; this.inputType = inputType; this.keyType = keyType; - if(!keyType.isKeyType()) { - throw new InvalidProgramException("Return type "+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key type"); + if (keyType instanceof CompositeType) { + this.keyFields = ((CompositeType<T>)keyType).getFlatFields(ExpressionKeys.SELECT_ALL_CHAR); } - - // we have to handle a special case here: - // if the keyType is a composite type, we need to select the full type with all its fields. - if(keyType instanceof CompositeType) { - ExpressionKeys<K> ek = new ExpressionKeys<>(new String[]{ExpressionKeys.SELECT_ALL_CHAR}, keyType); - logicalKeyFields = ek.computeLogicalKeyPositions(); - } else { - logicalKeyFields = new int[] {0}; + else { + this.keyFields = new ArrayList<>(1); + this.keyFields.add(new FlatFieldDescriptor(0, keyType)); } } @@ -123,61 +129,36 @@ public abstract class Keys<T> { @Override public int getNumberOfKeyFields() { - return logicalKeyFields.length; + return keyFields.size(); } @Override - public boolean areCompatible(Keys<?> other) throws IncompatibleKeysException { - - if (other instanceof SelectorFunctionKeys) { - @SuppressWarnings("unchecked") - SelectorFunctionKeys<?, K> sfk = (SelectorFunctionKeys<?, K>) other; - - return sfk.keyType.equals(this.keyType); - } - else if (other instanceof ExpressionKeys) { - ExpressionKeys<?> expressionKeys = (ExpressionKeys<?>) other; - - if(keyType.isTupleType()) { - // special case again: - TupleTypeInfoBase<?> tupleKeyType = (TupleTypeInfoBase<?>) keyType; - List<FlatFieldDescriptor> keyTypeFields = tupleKeyType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR); - if(expressionKeys.keyFields.size() != keyTypeFields.size()) { - throw new IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE); - } - for(int i=0; i < expressionKeys.keyFields.size(); i++) { - if(!expressionKeys.keyFields.get(i).getType().equals(keyTypeFields.get(i).getType())) { - throw new IncompatibleKeysException(expressionKeys.keyFields.get(i).getType(), keyTypeFields.get(i).getType() ); - } - } - return true; - } - if(expressionKeys.getNumberOfKeyFields() != 1) { - throw new IncompatibleKeysException("Key selector functions are only compatible to one key"); - } - - if(expressionKeys.keyFields.get(0).getType().equals(this.keyType)) { - return true; - } else { - throw new IncompatibleKeysException(expressionKeys.keyFields.get(0).getType(), this.keyType); - } - } else { - throw new IncompatibleKeysException("The key is not compatible with "+other); + public int[] computeLogicalKeyPositions() { + int[] logicalKeys = new int[keyFields.size()]; + for (int i = 0; i < keyFields.size(); i++) { + logicalKeys[i] = keyFields.get(i).getPosition(); } + return logicalKeys; } @Override - public int[] computeLogicalKeyPositions() { - return logicalKeyFields; + protected TypeInformation<?>[] getKeyFieldTypes() { + TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()]; + for (int i = 0; i < keyFields.size(); i++) { + fieldTypes[i] = keyFields.get(i).getType(); + } + return fieldTypes; } @Override public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo) { - if (logicalKeyFields.length != 1) { + + if (keyFields.size() != 1) { throw new InvalidProgramException("Custom partitioners can only be used with keys that have one key field."); } if (typeInfo == null) { + // try to extract key type from partitioner try { typeInfo = TypeExtractor.getPartitionerTypes(partitioner); } @@ -185,10 +166,14 @@ public abstract class Keys<T> { // best effort check, so we ignore exceptions } } - - if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo) && (!keyType.equals(typeInfo))) { - throw new InvalidProgramException("The partitioner is incompatible with the key type. " + + // only check if type is known and not a generic type + if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo)) { + // check equality of key and partitioner type + if (!keyType.equals(typeInfo)) { + throw new InvalidProgramException("The partitioner is incompatible with the key type. " + "Partitioner type: " + typeInfo + " , key type: " + keyType); + } } } @@ -281,132 +266,144 @@ public abstract class Keys<T> { /** - * Represents (nested) field access through string and integer-based keys for Composite Types (Tuple or Pojo) + * Represents (nested) field access through string and integer-based keys */ public static class ExpressionKeys<T> extends Keys<T> { public static final String SELECT_ALL_CHAR = "*"; public static final String SELECT_ALL_CHAR_SCALA = "_"; + // Flattened fields representing keys fields + private List<FlatFieldDescriptor> keyFields; + /** - * Flattened fields representing keys fields + * ExpressionKeys that is defined by the full data type. */ - private List<FlatFieldDescriptor> keyFields; - + public ExpressionKeys(TypeInformation<T> type) { + this(SELECT_ALL_CHAR, type); + } + /** - * two constructors for field-based (tuple-type) keys + * Create int-based (non-nested) field position keys on a tuple type. */ - public ExpressionKeys(int[] groupingFields, TypeInformation<T> type) { - this(groupingFields, type, false); + public ExpressionKeys(int keyPosition, TypeInformation<T> type) { + this(new int[]{keyPosition}, type, false); } - // int-defined field - public ExpressionKeys(int[] groupingFields, TypeInformation<T> type, boolean allowEmpty) { - if (!type.isTupleType()) { + /** + * Create int-based (non-nested) field position keys on a tuple type. + */ + public ExpressionKeys(int[] keyPositions, TypeInformation<T> type) { + this(keyPositions, type, false); + } + + /** + * Create int-based (non-nested) field position keys on a tuple type. + */ + public ExpressionKeys(int[] keyPositions, TypeInformation<T> type, boolean allowEmpty) { + + if (!type.isTupleType() || !(type instanceof CompositeType)) { throw new InvalidProgramException("Specifying keys via field positions is only valid " + "for tuple data types. Type: " + type); } if (type.getArity() == 0) { throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity()); } - - if (!allowEmpty && (groupingFields == null || groupingFields.length == 0)) { + if (!allowEmpty && (keyPositions == null || keyPositions.length == 0)) { throw new IllegalArgumentException("The grouping fields must not be empty."); } - // select all fields. Therefore, set all fields on this tuple level and let the logic handle the rest - // (makes type assignment easier). - if (groupingFields == null || groupingFields.length == 0) { - groupingFields = new int[type.getArity()]; - for (int i = 0; i < groupingFields.length; i++) { - groupingFields[i] = i; - } + + this.keyFields = new ArrayList<>(); + + if (keyPositions == null || keyPositions.length == 0) { + // use all tuple fields as key fields + keyPositions = createIncrIntArray(type.getArity()); } else { - groupingFields = rangeCheckFields(groupingFields, type.getArity() -1); + rangeCheckFields(keyPositions, type.getArity() - 1); } - Preconditions.checkArgument(groupingFields.length > 0, "Grouping fields can not be empty at this point"); - - keyFields = new ArrayList<>(type.getTotalFields()); - // for each key, find the field: - for (int keyPos : groupingFields) { - int offset = 0; - for (int i = 0; i < type.getArity(); i++) { - - TypeInformation<?> fieldType = ((CompositeType<?>) type).getTypeAt(i); - if (i < keyPos) { - // not yet there, increment key offset - offset += fieldType.getTotalFields(); - } else { - // arrived at key position - if (!fieldType.isKeyType()) { - throw new InvalidProgramException("This type (" + fieldType + ") cannot be used as key."); - } - if (fieldType instanceof CompositeType) { - // add all nested fields of composite type - ((CompositeType<?>) fieldType).getFlatFields("*", offset, keyFields); - } else if (fieldType instanceof AtomicType) { - // add atomic type field - keyFields.add(new FlatFieldDescriptor(offset, fieldType)); - } else { - // type should either be composite or atomic - throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: " + fieldType); - } - // go to next key - break; + Preconditions.checkArgument(keyPositions.length > 0, "Grouping fields can not be empty at this point"); + + // extract key field types + CompositeType<T> cType = (CompositeType<T>)type; + this.keyFields = new ArrayList<>(type.getTotalFields()); + + // for each key position, find all (nested) field types + String[] fieldNames = cType.getFieldNames(); + ArrayList<FlatFieldDescriptor> tmpList = new ArrayList<>(); + for (int keyPos : keyPositions) { + tmpList.clear(); + // get all flat fields + cType.getFlatFields(fieldNames[keyPos], 0, tmpList); + // check if fields are of key type + for(FlatFieldDescriptor ffd : tmpList) { + if(!ffd.getType().isKeyType()) { + throw new InvalidProgramException("This type (" + ffd.getType() + ") cannot be used as key."); } } + this.keyFields.addAll(tmpList); } - keyFields = removeNullElementsFromList(keyFields); } - public static <R> List<R> removeNullElementsFromList(List<R> in) { - List<R> elements = new ArrayList<>(); - for(R e: in) { - if(e != null) { - elements.add(e); - } - } - return elements; + /** + * Create String-based (nested) field expression keys on a composite type. + */ + public ExpressionKeys(String keyExpression, TypeInformation<T> type) { + this(new String[]{keyExpression}, type); } - + /** - * Create ExpressionKeys from String-expressions + * Create String-based (nested) field expression keys on a composite type. */ - public ExpressionKeys(String[] expressionsIn, TypeInformation<T> type) { - Preconditions.checkNotNull(expressionsIn, "Field expression cannot be null."); + public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) { + Preconditions.checkNotNull(keyExpressions, "Field expression cannot be null."); - if (type instanceof AtomicType) { - if (!type.isKeyType()) { - throw new InvalidProgramException("This type (" + type + ") cannot be used as key."); - } else if (expressionsIn.length != 1 || !(Keys.ExpressionKeys.SELECT_ALL_CHAR.equals(expressionsIn[0]) || Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA.equals(expressionsIn[0]))) { - throw new InvalidProgramException("Field expression for atomic type must be equal to '*' or '_'."); - } + this.keyFields = new ArrayList<>(keyExpressions.length); - keyFields = new ArrayList<>(1); - keyFields.add(new FlatFieldDescriptor(0, type)); - } else { + if (type instanceof CompositeType){ CompositeType<T> cType = (CompositeType<T>) type; - String[] expressions = removeDuplicates(expressionsIn); - if(expressionsIn.length != expressions.length) { - LOG.warn("The key expressions contained duplicates. They are now unique"); - } // extract the keys on their flat position - keyFields = new ArrayList<>(expressions.length); - for (String expression : expressions) { - List<FlatFieldDescriptor> keys = cType.getFlatFields(expression); // use separate list to do a size check - for (FlatFieldDescriptor key : keys) { - TypeInformation<?> keyType = key.getType(); - if (!keyType.isKeyType()) { - throw new InvalidProgramException("This type (" + key.getType() + ") cannot be used as key."); - } - if (!(keyType instanceof AtomicType || keyType instanceof CompositeType)) { - throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: " + keyType); + for (String keyExpr : keyExpressions) { + if (keyExpr == null) { + throw new InvalidProgramException("Expression key may not be null."); + } + // strip off whitespace + keyExpr = keyExpr.trim(); + + List<FlatFieldDescriptor> flatFields = cType.getFlatFields(keyExpr); + + if (flatFields.size() == 0) { + throw new InvalidProgramException("Unable to extract key from expression '" + keyExpr + "' on key " + cType); + } + // check if all nested fields can be used as keys + for (FlatFieldDescriptor field : flatFields) { + if (!field.getType().isKeyType()) { + throw new InvalidProgramException("This type (" + field.getType() + ") cannot be used as key."); } } - if (keys.size() == 0) { - throw new InvalidProgramException("Unable to extract key from expression '" + expression + "' on key " + cType); + // add flat fields to key fields + keyFields.addAll(flatFields); + } + } + else { + if (!type.isKeyType()) { + throw new InvalidProgramException("This type (" + type + ") cannot be used as key."); + } + + // check that all key expressions are valid + for (String keyExpr : keyExpressions) { + if (keyExpr == null) { + throw new InvalidProgramException("Expression key may not be null."); + } + // strip off whitespace + keyExpr = keyExpr.trim(); + // check that full type is addressed + if (!(SELECT_ALL_CHAR.equals(keyExpr) || SELECT_ALL_CHAR_SCALA.equals(keyExpr))) { + throw new InvalidProgramException( + "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for non-composite types."); } - keyFields.addAll(keys); + // add full type as key + keyFields.add(new FlatFieldDescriptor(0, type)); } } } @@ -420,43 +417,32 @@ public abstract class Keys<T> { } @Override - public boolean areCompatible(Keys<?> other) throws IncompatibleKeysException { - - if (other instanceof ExpressionKeys) { - ExpressionKeys<?> oKey = (ExpressionKeys<?>) other; - - if(oKey.getNumberOfKeyFields() != this.getNumberOfKeyFields() ) { - throw new IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE); - } - for(int i=0; i < this.keyFields.size(); i++) { - if(!this.keyFields.get(i).getType().equals(oKey.keyFields.get(i).getType())) { - throw new IncompatibleKeysException(this.keyFields.get(i).getType(), oKey.keyFields.get(i).getType() ); - } - } - return true; - } else if(other instanceof SelectorFunctionKeys<?, ?>) { - return other.areCompatible(this); - } else { - throw new IncompatibleKeysException("The key is not compatible with "+other); + public int[] computeLogicalKeyPositions() { + int[] logicalKeys = new int[keyFields.size()]; + for (int i = 0; i < keyFields.size(); i++) { + logicalKeys[i] = keyFields.get(i).getPosition(); } + return logicalKeys; } @Override - public int[] computeLogicalKeyPositions() { - List<Integer> logicalKeys = new ArrayList<>(); - for (FlatFieldDescriptor kd : keyFields) { - logicalKeys.add(kd.getPosition()); + protected TypeInformation<?>[] getKeyFieldTypes() { + TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()]; + for (int i = 0; i < keyFields.size(); i++) { + fieldTypes[i] = keyFields.get(i).getType(); } - return Ints.toArray(logicalKeys); + return fieldTypes; } - + @Override public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo) { + if (keyFields.size() != 1) { throw new InvalidProgramException("Custom partitioners can only be used with keys that have one key field."); } - + if (typeInfo == null) { + // try to extract key type from partitioner try { typeInfo = TypeExtractor.getPartitionerTypes(partitioner); } @@ -464,8 +450,10 @@ public abstract class Keys<T> { // best effort check, so we ignore exceptions } } - + if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo)) { + // only check type compatibility if type is known and not a generic type + TypeInformation<?> keyType = keyFields.get(0).getType(); if (!keyType.equals(typeInfo)) { throw new InvalidProgramException("The partitioner is incompatible with the key type. " @@ -479,51 +467,71 @@ public abstract class Keys<T> { Joiner join = Joiner.on('.'); return "ExpressionKeys: " + join.join(keyFields); } - } - - private static String[] removeDuplicates(String[] in) { - List<String> ret = new LinkedList<>(); - for(String el : in) { - if(!ret.contains(el)) { - ret.add(el); + + public static boolean isSortKey(int fieldPos, TypeInformation<?> type) { + + if (!type.isTupleType() || !(type instanceof CompositeType)) { + throw new InvalidProgramException("Specifying keys via field positions is only valid " + + "for tuple data types. Type: " + type); + } + if (type.getArity() == 0) { + throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity()); + } + + if(fieldPos < 0 || fieldPos >= type.getArity()) { + throw new IndexOutOfBoundsException("Tuple position is out of range: " + fieldPos); + } + + TypeInformation<?> sortKeyType = ((CompositeType<?>)type).getTypeAt(fieldPos); + return sortKeyType.isSortKeyType(); + } + + public static boolean isSortKey(String fieldExpr, TypeInformation<?> type) { + + TypeInformation<?> sortKeyType; + + fieldExpr = fieldExpr.trim(); + if (SELECT_ALL_CHAR.equals(fieldExpr) || SELECT_ALL_CHAR_SCALA.equals(fieldExpr)) { + sortKeyType = type; + } + else { + if (type instanceof CompositeType) { + sortKeyType = ((CompositeType<?>) type).getTypeAt(fieldExpr); + } + else { + throw new InvalidProgramException( + "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for atomic types."); + } } + + return sortKeyType.isSortKeyType(); } - return ret.toArray(new String[ret.size()]); + } + // -------------------------------------------------------------------------------------------- - - + + + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- - private static int[] rangeCheckFields(int[] fields, int maxAllowedField) { - - // range check and duplicate eliminate - int i = 1, k = 0; - int last = fields[0]; - - if (last < 0 || last > maxAllowedField) { - throw new IllegalArgumentException("Tuple position is out of range: " + last); + private static int[] createIncrIntArray(int numKeys) { + int[] keyFields = new int[numKeys]; + for (int i = 0; i < numKeys; i++) { + keyFields[i] = i; } + return keyFields; + } - for (; i < fields.length; i++) { - if (fields[i] < 0 || fields[i] > maxAllowedField) { - throw new IllegalArgumentException("Tuple position is out of range."); - } - if (fields[i] != last) { - k++; - last = fields[i]; - fields[k] = fields[i]; - } - } + private static void rangeCheckFields(int[] fields, int maxAllowedField) { - // check if we eliminated something - if (k == fields.length - 1) { - return fields; - } else { - return Arrays.copyOfRange(fields, 0, k+1); + for (int f : fields) { + if (f < 0 || f > maxAllowedField) { + throw new IndexOutOfBoundsException("Tuple position is out of range: " + f); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java index c3d46f2..1384ca2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.PartitionOperatorBase; import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.tuple.Tuple2; @@ -71,10 +70,6 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe Preconditions.checkArgument(pKeys != null || pMethod == PartitionMethod.REBALANCE, "Partitioning requires keys"); Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM || customPartitioner != null, "Custom partioning requires a partitioner."); - if (pKeys instanceof Keys.ExpressionKeys<?> && !(input.getType() instanceof CompositeType) ) { - throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Tuple or POJO DataSets"); - } - if (customPartitioner != null) { pKeys.validateCustomPartitioner(customPartitioner, partitionerTypeInfo); } http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java index 3fce01b..d65bc68 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java @@ -24,10 +24,7 @@ import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import java.util.Arrays; @@ -97,61 +94,22 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart private int[] getFlatFields(int field) { - if(!(super.getType() instanceof TupleTypeInfoBase<?>)) { - throw new InvalidProgramException("Field positions can only be specified on Tuple or " + - "Case Class types."); - } - else { - // check selected field is sortable type - TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) super.getType()).getTypeAt(field); - if (!sortKeyType.isSortKeyType()) { - throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); - } + if (!Keys.ExpressionKeys.isSortKey(field, super.getType())) { + throw new InvalidProgramException("Selected sort key is not a sortable type"); } - Keys.ExpressionKeys<T> ek; - try { - ek = new Keys.ExpressionKeys<T>(new int[]{field}, super.getType()); - } catch(IllegalArgumentException iae) { - throw new InvalidProgramException("Invalid specification of field position.", iae); - } + Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(field, super.getType()); return ek.computeLogicalKeyPositions(); } private int[] getFlatFields(String fields) { - if(super.getType() instanceof CompositeType) { - - // check selected field is sortable type - TypeInformation<?> sortKeyType = ((CompositeType<?>) super.getType()).getTypeAt(fields); - if (!sortKeyType.isSortKeyType()) { - throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); - } - - // compute flat field positions for (nested) sorting fields - Keys.ExpressionKeys<T> ek; - try { - ek = new Keys.ExpressionKeys<T>(new String[]{fields}, super.getType()); - } catch(IllegalArgumentException iae) { - throw new InvalidProgramException("Invalid specification of field expression.", iae); - } - return ek.computeLogicalKeyPositions(); - } else { - - fields = fields.trim(); - if (!(fields.equals("*") || fields.equals("_"))) { - throw new InvalidProgramException("Output sorting of non-composite types can only be defined on the full type. " + - "Use a field wildcard for that (\"*\" or \"_\")"); - } else { - - // check if selected field is sortable type - if (!super.getType().isSortKeyType()) { - throw new InvalidProgramException("Selected sort key cannot be sorted: " + super.getType()); - } - - return new int[]{0}; - } + if (!Keys.ExpressionKeys.isSortKey(fields, super.getType())) { + throw new InvalidProgramException("Selected sort key is not a sortable type"); } + + Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(fields, super.getType()); + return ek.computeLogicalKeyPositions(); } private void appendSorting(int[] flatOrderFields, Order order) { @@ -189,8 +147,8 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart } // distinguish between partition types - UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getType(), getType()); - SortPartitionOperatorBase<T> noop = new SortPartitionOperatorBase<T>(operatorInfo, partitionOrdering, name); + UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<>(getType(), getType()); + SortPartitionOperatorBase<T> noop = new SortPartitionOperatorBase<>(operatorInfo, partitionOrdering, name); noop.setInput(input); if(this.getParallelism() < 0) { // use parallelism of input if not explicitly specified http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index 6092d14..f626e65 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -21,7 +21,6 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.FirstReducer; @@ -33,7 +32,6 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.api.java.typeutils.TypeExtractor; import com.google.common.base.Preconditions; @@ -58,17 +56,14 @@ public class SortedGrouping<T> extends Grouping<T> { */ public SortedGrouping(DataSet<T> set, Keys<T> keys, int field, Order order) { super(set, keys); - - if (!dataSet.getType().isTupleType()) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types"); - } - if (field >= dataSet.getType().getArity()) { - throw new IllegalArgumentException("Order key out of tuple bounds."); + + if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) { + throw new InvalidProgramException("Selected sort key is not a sortable type"); } - isValidSortKeyType(field); // use int-based expression key to properly resolve nested tuples for grouping - ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType()); + ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType()); + this.groupSortKeyPositions = ek.computeLogicalKeyPositions(); this.groupSortOrders = new Order[groupSortKeyPositions.length]; Arrays.fill(this.groupSortOrders, order); @@ -79,14 +74,14 @@ public class SortedGrouping<T> extends Grouping<T> { */ public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order order) { super(set, keys); - - if (!(dataSet.getType() instanceof CompositeType)) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)"); + + if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) { + throw new InvalidProgramException("Selected sort key is not a sortable type"); } - isValidSortKeyType(field); // resolve String-field to int using the expression keys - ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType()); + ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType()); + this.groupSortKeyPositions = ek.computeLogicalKeyPositions(); this.groupSortOrders = new Order[groupSortKeyPositions.length]; Arrays.fill(this.groupSortOrders, order); // if field == "*" @@ -174,7 +169,7 @@ public class SortedGrouping<T> extends Grouping<T> { } TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType(), Utils.getCallLocationName(), true); - return new GroupReduceOperator<T, R>(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName()); + return new GroupReduceOperator<>(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName()); } /** @@ -196,7 +191,7 @@ public class SortedGrouping<T> extends Grouping<T> { TypeInformation<R> resultType = TypeExtractor.getGroupCombineReturnTypes(combiner, this.getDataSet().getType(), Utils.getCallLocationName(), true); - return new GroupCombineOperator<T, R>(this, resultType, dataSet.clean(combiner), Utils.getCallLocationName()); + return new GroupCombineOperator<>(this, resultType, dataSet.clean(combiner), Utils.getCallLocationName()); } @@ -233,15 +228,12 @@ public class SortedGrouping<T> extends Grouping<T> { if (groupSortSelectorFunctionKey != null) { throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not supported"); } - if (!dataSet.getType().isTupleType()) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types"); - } - if (field >= dataSet.getType().getArity()) { - throw new IllegalArgumentException("Order key out of tuple bounds."); + if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) { + throw new InvalidProgramException("Selected sort key is not a sortable type"); } - isValidSortKeyType(field); - ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType()); + ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType()); + addSortGroupInternal(ek, order); return this; } @@ -262,12 +254,12 @@ public class SortedGrouping<T> extends Grouping<T> { if (groupSortSelectorFunctionKey != null) { throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not supported"); } - if (! (dataSet.getType() instanceof CompositeType)) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)"); + if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) { + throw new InvalidProgramException("Selected sort key is not a sortable type"); } - isValidSortKeyType(field); - ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType()); + ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType()); + addSortGroupInternal(ek, order); return this; } @@ -287,26 +279,4 @@ public class SortedGrouping<T> extends Grouping<T> { } } - private void isValidSortKeyType(int field) { - TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) dataSet.getType()).getTypeAt(field); - if (!sortKeyType.isSortKeyType()) { - throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); - } - } - - private void isValidSortKeyType(String field) { - TypeInformation<?> sortKeyType; - - field = field.trim(); - if(field.equals("*") || field.equals("_")) { - sortKeyType = this.getDataSet().getType(); - } else { - sortKeyType = ((CompositeType<?>) this.getDataSet().getType()).getTypeAt(field); - } - - if (!sortKeyType.isSortKeyType()) { - throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType); - } - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java index f32f6a9..90a65e6 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java @@ -101,7 +101,7 @@ public class CoGroupOperatorTest { ds1.coGroup(ds2).where(0,1).equalTo(2); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testCoGroupKeyFields4() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -112,7 +112,7 @@ public class CoGroupOperatorTest { ds1.coGroup(ds2).where(5).equalTo(0); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testCoGroupKeyFields5() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -123,7 +123,7 @@ public class CoGroupOperatorTest { ds1.coGroup(ds2).where(-1).equalTo(-1); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = InvalidProgramException.class) public void testCoGroupKeyFields6() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -131,7 +131,7 @@ public class CoGroupOperatorTest { DataSet<CustomType> ds2 = env.fromCollection(customTypeData); // should not work, cogroup key fields on custom type - ds1.coGroup(ds2).where(5).equalTo(0); + ds1.coGroup(ds2).where(4).equalTo(0); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java index 5024a0e..0493583 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java @@ -63,7 +63,7 @@ public class DataSinkTest { // should work try { - tupleDs.sortPartition(0, Order.ANY).writeAsText("/tmp/willNotHappen"); + tupleDs.writeAsText("/tmp/willNotHappen").sortLocalOutput(0, Order.ANY); } catch (Exception e) { Assert.fail(); } @@ -79,9 +79,9 @@ public class DataSinkTest { // should work try { - tupleDs.sortPartition(0, Order.ASCENDING) - .sortPartition(3, Order.DESCENDING) - .writeAsText("/tmp/willNotHappen"); + tupleDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput(0, Order.ASCENDING) + .sortLocalOutput(3, Order.DESCENDING); } catch (Exception e) { Assert.fail(); } @@ -97,13 +97,13 @@ public class DataSinkTest { // should work try { - tupleDs.sortPartition("f0", Order.ANY).writeAsText("/tmp/willNotHappen"); + tupleDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("f0", Order.ANY); } catch (Exception e) { Assert.fail(); } } - @Test(expected = CompositeType.InvalidFieldReferenceException.class) public void testTupleSingleOrderExpFull() { final ExecutionEnvironment env = ExecutionEnvironment @@ -112,7 +112,8 @@ public class DataSinkTest { .fromCollection(emptyTupleData, tupleTypeInfo); // should not work - tupleDs.sortPartition("*", Order.ANY).writeAsText("/tmp/willNotHappen"); + tupleDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("*", Order.ANY); } @Test @@ -125,9 +126,9 @@ public class DataSinkTest { // should work try { - tupleDs.sortPartition("f1", Order.ASCENDING) - .sortPartition("f4", Order.DESCENDING) - .writeAsText("/tmp/willNotHappen"); + tupleDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("f1", Order.ASCENDING) + .sortLocalOutput("f4", Order.DESCENDING); } catch (Exception e) { Assert.fail(); } @@ -143,9 +144,9 @@ public class DataSinkTest { // should work try { - tupleDs.sortPartition(4, Order.ASCENDING) - .sortPartition("f2", Order.DESCENDING) - .writeAsText("/tmp/willNotHappen"); + tupleDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput(4, Order.ASCENDING) + .sortLocalOutput("f2", Order.DESCENDING); } catch (Exception e) { Assert.fail(); } @@ -160,9 +161,9 @@ public class DataSinkTest { .fromCollection(emptyTupleData, tupleTypeInfo); // must not work - tupleDs.sortPartition(3, Order.ASCENDING) - .sortPartition(5, Order.DESCENDING) - .writeAsText("/tmp/willNotHappen"); + tupleDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput(3, Order.ASCENDING) + .sortLocalOutput(5, Order.DESCENDING); } @Test(expected = CompositeType.InvalidFieldReferenceException.class) @@ -174,9 +175,9 @@ public class DataSinkTest { .fromCollection(emptyTupleData, tupleTypeInfo); // must not work - tupleDs.sortPartition("notThere", Order.ASCENDING) - .sortPartition("f4", Order.DESCENDING) - .writeAsText("/tmp/willNotHappen"); + tupleDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("notThere", Order.ASCENDING) + .sortLocalOutput("f4", Order.DESCENDING); } @Test @@ -189,7 +190,8 @@ public class DataSinkTest { // should work try { - longDs.sortPartition("*", Order.ASCENDING).writeAsText("/tmp/willNotHappen"); + longDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("*", Order.ASCENDING); } catch (Exception e) { Assert.fail(); } @@ -204,7 +206,8 @@ public class DataSinkTest { .generateSequence(0,2); // must not work - longDs.sortPartition(0, Order.ASCENDING).writeAsText("/tmp/willNotHappen"); + longDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput(0, Order.ASCENDING); } @Test(expected = InvalidProgramException.class) @@ -216,7 +219,8 @@ public class DataSinkTest { .generateSequence(0,2); // must not work - longDs.sortPartition("0", Order.ASCENDING).writeAsText("/tmp/willNotHappen"); + longDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("0", Order.ASCENDING); } @Test(expected = InvalidProgramException.class) @@ -228,7 +232,8 @@ public class DataSinkTest { .generateSequence(0,2); // must not work - longDs.sortPartition("nope", Order.ASCENDING).writeAsText("/tmp/willNotHappen"); + longDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("nope", Order.ASCENDING); } @Test @@ -241,7 +246,8 @@ public class DataSinkTest { // should work try { - pojoDs.sortPartition("myString", Order.ASCENDING).writeAsText("/tmp/willNotHappen"); + pojoDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("myString", Order.ASCENDING); } catch (Exception e) { Assert.fail(); } @@ -257,9 +263,9 @@ public class DataSinkTest { // should work try { - pojoDs.sortPartition("myLong", Order.ASCENDING) - .sortPartition("myString", Order.DESCENDING) - .writeAsText("/tmp/willNotHappen"); + pojoDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("myLong", Order.ASCENDING) + .sortLocalOutput("myString", Order.DESCENDING); } catch (Exception e) { Assert.fail(); } @@ -274,7 +280,8 @@ public class DataSinkTest { .fromCollection(pojoData); // must not work - pojoDs.sortPartition(1, Order.DESCENDING).writeAsText("/tmp/willNotHappen"); + pojoDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput(1, Order.DESCENDING); } @Test(expected = CompositeType.InvalidFieldReferenceException.class) @@ -286,12 +293,12 @@ public class DataSinkTest { .fromCollection(pojoData); // must not work - pojoDs.sortPartition("myInt", Order.ASCENDING) - .sortPartition("notThere", Order.DESCENDING) - .writeAsText("/tmp/willNotHappen"); + pojoDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("myInt", Order.ASCENDING) + .sortLocalOutput("notThere", Order.DESCENDING); } - @Test(expected = CompositeType.InvalidFieldReferenceException.class) + @Test(expected = InvalidProgramException.class) public void testPojoSingleOrderFull() { final ExecutionEnvironment env = ExecutionEnvironment @@ -300,8 +307,8 @@ public class DataSinkTest { .fromCollection(pojoData); // must not work - pojoDs.sortPartition("*", Order.ASCENDING) - .writeAsText("/tmp/willNotHappen"); + pojoDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("*", Order.ASCENDING); } @Test(expected = InvalidProgramException.class) @@ -316,8 +323,8 @@ public class DataSinkTest { .fromCollection(arrayData); // must not work - pojoDs.sortPartition("*", Order.ASCENDING) - .writeAsText("/tmp/willNotHappen"); + pojoDs.writeAsText("/tmp/willNotHappen") + .sortLocalOutput("*", Order.ASCENDING); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java index f4bd945..2e9bdf7 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java @@ -112,7 +112,7 @@ public class DistinctOperatorTest { customDs.distinct(); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testDistinctByKeyFields6() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java index fe362f0..794f0d4 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java @@ -123,7 +123,7 @@ public class FullOuterJoinOperatorTest { .with(new DummyJoin()); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testFullOuter7() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java index bdad3db..9220095 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java @@ -107,7 +107,7 @@ public class GroupingTest { } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testGroupByKeyFields4() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -117,7 +117,7 @@ public class GroupingTest { tupleDs.groupBy(5); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testGroupByKeyFields5() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -334,7 +334,7 @@ public class GroupingTest { } } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testGroupSortKeyFields2() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java index be964cc..ae23382 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java @@ -137,7 +137,7 @@ public class JoinOperatorTest { ds1.join(ds2).where(0,1).equalTo(2); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testJoinKeyFields4() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -148,7 +148,7 @@ public class JoinOperatorTest { ds1.join(ds2).where(5).equalTo(0); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testJoinKeyFields5() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -159,7 +159,7 @@ public class JoinOperatorTest { ds1.join(ds2).where(-1).equalTo(-1); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = InvalidProgramException.class) public void testJoinKeyFields6() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -167,7 +167,7 @@ public class JoinOperatorTest { DataSet<CustomType> ds2 = env.fromCollection(customTypeData); // should not work, join key fields on custom type - ds1.join(ds2).where(5).equalTo(0); + ds1.join(ds2).where(4).equalTo(0); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java index 06b0c13..cab06c2 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java @@ -124,7 +124,7 @@ public class LeftOuterJoinOperatorTest { .with(new DummyJoin()); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testLeftOuter7() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/af029e7e/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java index 0e407ca..411edd5 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java @@ -123,7 +123,7 @@ public class RightOuterJoinOperatorTest { .with(new DummyJoin()); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testRightOuter7() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);