http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index 0c0b710..c02d365 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.RichFlatJoinFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase; @@ -42,9 +43,9 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSec import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException; -import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; +import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.operators.join.JoinOperatorSetsBase; import org.apache.flink.api.java.operators.join.JoinType; import org.apache.flink.api.java.operators.join.JoinFunctionAssigner; @@ -394,8 +395,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, @SuppressWarnings("unchecked") SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>)rawKeys1; - TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = SelectorFunctionKeys.createTypeWithKey(keys1); - Operator<Tuple2<K, I1>> keyMapper1 = SelectorFunctionKeys.appendKeyExtractor(input1, keys1); + TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1); + Operator<Tuple2<K, I1>> keyMapper1 = KeyFunctions.appendKeyExtractor(input1, keys1); return this.withInput1(keyMapper1, typeInfoWithKey1, rawKeys1); } @@ -406,8 +407,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, @SuppressWarnings("unchecked") SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>)rawKeys2; - TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = SelectorFunctionKeys.createTypeWithKey(keys2); - Operator<Tuple2<K, I2>> keyMapper2 = SelectorFunctionKeys.appendKeyExtractor(input2, keys2); + TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2); + Operator<Tuple2<K, I2>> keyMapper2 = KeyFunctions.appendKeyExtractor(input2, keys2); return withInput2(keyMapper2, typeInfoWithKey2, rawKeys2); }
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java new file mode 100644 index 0000000..49d598a --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operators; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; +import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; +import org.apache.flink.api.java.operators.translation.KeyRemovingMapper; +import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; + +/** + * This class holds static utilities to append functions that extract and + * prune keys. + */ +public class KeyFunctions { + + @SuppressWarnings("unchecked") + public static <T, K> org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> appendKeyExtractor( + org.apache.flink.api.common.operators.Operator<T> input, + SelectorFunctionKeys<T, K> key) + { + + TypeInformation<T> inputType = key.getInputType(); + TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key); + KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor()); + + MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> mapper = + new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>( + extractor, + new UnaryOperatorInformation(inputType, typeInfoWithKey), + "Key Extractor" + ); + + mapper.setInput(input); + mapper.setParallelism(input.getParallelism()); + + return mapper; + } + + @SuppressWarnings("unchecked") + public static <T, K1, K2> org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> appendKeyExtractor( + org.apache.flink.api.common.operators.Operator<T> input, + SelectorFunctionKeys<T, K1> key1, + SelectorFunctionKeys<T, K2> key2) + { + + TypeInformation<T> inputType = key1.getInputType(); + TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2); + TwoKeyExtractingMapper<T, K1, K2> extractor = + new TwoKeyExtractingMapper<>(key1.getKeyExtractor(), key2.getKeyExtractor()); + + MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>> mapper = + new MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>>( + extractor, + new UnaryOperatorInformation<>(inputType, typeInfoWithKey), + "Key Extractor" + ); + + mapper.setInput(input); + mapper.setParallelism(input.getParallelism()); + + return mapper; + } + + public static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> appendKeyRemover( + org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> inputWithKey, + SelectorFunctionKeys<T, K> key) + { + + TypeInformation<T> inputType = key.getInputType(); + TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key); + + MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> mapper = + new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>( + new KeyRemovingMapper<T, K>(), + new UnaryOperatorInformation<>(typeInfoWithKey, inputType), + "Key Remover" + ); + mapper.setInput(inputWithKey); + mapper.setParallelism(inputWithKey.getParallelism()); + + return mapper; + } + + public static <T, K> TypeInformation<Tuple2<K, T>> createTypeWithKey( + SelectorFunctionKeys<T, K> key) + { + return new TupleTypeInfo<>(key.getKeyType(), key.getInputType()); + } + + public static <T, K1, K2> TypeInformation<Tuple3<K1, K2, T>> createTypeWithKey( + SelectorFunctionKeys<T, K1> key1, + SelectorFunctionKeys<T, K2> key2) + { + return new TupleTypeInfo<>(key1.getKeyType(), key2.getKeyType(), key1.getInputType()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java deleted file mode 100644 index 5992f0b..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java +++ /dev/null @@ -1,550 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.operators; - -import java.util.ArrayList; -import java.util.List; - -import com.google.common.base.Joiner; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.operators.UnaryOperatorInformation; -import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; -import org.apache.flink.api.java.operators.translation.KeyRemovingMapper; -import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -import com.google.common.base.Preconditions; - -public abstract class Keys<T> { - - public abstract int getNumberOfKeyFields(); - - public abstract int[] computeLogicalKeyPositions(); - - protected abstract TypeInformation<?>[] getKeyFieldTypes(); - - public abstract <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo); - - public boolean isEmpty() { - return getNumberOfKeyFields() == 0; - } - - /** - * Check if two sets of keys are compatible to each other (matching types, key counts) - */ - public boolean areCompatible(Keys<?> other) throws IncompatibleKeysException { - - TypeInformation<?>[] thisKeyFieldTypes = this.getKeyFieldTypes(); - TypeInformation<?>[] otherKeyFieldTypes = other.getKeyFieldTypes(); - - if (thisKeyFieldTypes.length != otherKeyFieldTypes.length) { - throw new IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE); - } else { - for (int i = 0; i < thisKeyFieldTypes.length; i++) { - if (!thisKeyFieldTypes[i].equals(otherKeyFieldTypes[i])) { - throw new IncompatibleKeysException(thisKeyFieldTypes[i], otherKeyFieldTypes[i] ); - } - } - } - return true; - } - - // -------------------------------------------------------------------------------------------- - // Specializations for expression-based / extractor-based grouping - // -------------------------------------------------------------------------------------------- - - - public static class SelectorFunctionKeys<T, K> extends Keys<T> { - - private final KeySelector<T, K> keyExtractor; - private final TypeInformation<T> inputType; - private final TypeInformation<K> keyType; - private final List<FlatFieldDescriptor> keyFields; - - public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> inputType, TypeInformation<K> keyType) { - - if (keyExtractor == null) { - throw new NullPointerException("Key extractor must not be null."); - } - if (keyType == null) { - throw new NullPointerException("Key type must not be null."); - } - if (!keyType.isKeyType()) { - throw new InvalidProgramException("Return type "+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key type"); - } - - this.keyExtractor = keyExtractor; - this.inputType = inputType; - this.keyType = keyType; - - if (keyType instanceof CompositeType) { - this.keyFields = ((CompositeType<T>)keyType).getFlatFields(ExpressionKeys.SELECT_ALL_CHAR); - } - else { - this.keyFields = new ArrayList<>(1); - this.keyFields.add(new FlatFieldDescriptor(0, keyType)); - } - } - - public TypeInformation<K> getKeyType() { - return keyType; - } - - public TypeInformation<T> getInputType() { - return inputType; - } - - public KeySelector<T, K> getKeyExtractor() { - return keyExtractor; - } - - @Override - public int getNumberOfKeyFields() { - return keyFields.size(); - } - - @Override - public int[] computeLogicalKeyPositions() { - int[] logicalKeys = new int[keyFields.size()]; - for (int i = 0; i < keyFields.size(); i++) { - logicalKeys[i] = keyFields.get(i).getPosition(); - } - return logicalKeys; - } - - @Override - protected TypeInformation<?>[] getKeyFieldTypes() { - TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()]; - for (int i = 0; i < keyFields.size(); i++) { - fieldTypes[i] = keyFields.get(i).getType(); - } - return fieldTypes; - } - - @Override - public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo) { - - if (keyFields.size() != 1) { - throw new InvalidProgramException("Custom partitioners can only be used with keys that have one key field."); - } - - if (typeInfo == null) { - // try to extract key type from partitioner - try { - typeInfo = TypeExtractor.getPartitionerTypes(partitioner); - } - catch (Throwable t) { - // best effort check, so we ignore exceptions - } - } - - // only check if type is known and not a generic type - if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo)) { - // check equality of key and partitioner type - if (!keyType.equals(typeInfo)) { - throw new InvalidProgramException("The partitioner is incompatible with the key type. " - + "Partitioner type: " + typeInfo + " , key type: " + keyType); - } - } - } - - @SuppressWarnings("unchecked") - public static <T, K> Operator<Tuple2<K, T>> appendKeyExtractor( - Operator<T> input, - SelectorFunctionKeys<T, K> key) - { - - TypeInformation<T> inputType = key.getInputType(); - TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key); - KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor()); - - MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> mapper = - new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>( - extractor, - new UnaryOperatorInformation(inputType, typeInfoWithKey), - "Key Extractor" - ); - - mapper.setInput(input); - mapper.setParallelism(input.getParallelism()); - - return mapper; - } - - @SuppressWarnings("unchecked") - public static <T, K1, K2> Operator<Tuple3<K1, K2, T>> appendKeyExtractor( - Operator<T> input, - SelectorFunctionKeys<T, K1> key1, - SelectorFunctionKeys<T, K2> key2) - { - - TypeInformation<T> inputType = key1.getInputType(); - TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2); - TwoKeyExtractingMapper<T, K1, K2> extractor = - new TwoKeyExtractingMapper<>(key1.getKeyExtractor(), key2.getKeyExtractor()); - - MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>> mapper = - new MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>>( - extractor, - new UnaryOperatorInformation<>(inputType, typeInfoWithKey), - "Key Extractor" - ); - - mapper.setInput(input); - mapper.setParallelism(input.getParallelism()); - - return mapper; - } - - public static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> appendKeyRemover( - Operator<Tuple2<K, T>> inputWithKey, - SelectorFunctionKeys<T, K> key) - { - - TypeInformation<T> inputType = key.getInputType(); - TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key); - - MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> mapper = - new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>( - new KeyRemovingMapper<T, K>(), - new UnaryOperatorInformation<>(typeInfoWithKey, inputType), - "Key Remover" - ); - mapper.setInput(inputWithKey); - mapper.setParallelism(inputWithKey.getParallelism()); - - return mapper; - } - - public static <T, K> TypeInformation<Tuple2<K, T>> createTypeWithKey( - SelectorFunctionKeys<T, K> key) - { - return new TupleTypeInfo<>(key.getKeyType(), key.getInputType()); - } - - public static <T, K1, K2> TypeInformation<Tuple3<K1, K2, T>> createTypeWithKey( - SelectorFunctionKeys<T, K1> key1, - SelectorFunctionKeys<T, K2> key2) - { - return new TupleTypeInfo<>(key1.getKeyType(), key2.getKeyType(), key1.getInputType()); - } - - @Override - public String toString() { - return "Key function (Type: " + keyType + ")"; - } - } - - - /** - * Represents (nested) field access through string and integer-based keys - */ - public static class ExpressionKeys<T> extends Keys<T> { - - public static final String SELECT_ALL_CHAR = "*"; - public static final String SELECT_ALL_CHAR_SCALA = "_"; - - // Flattened fields representing keys fields - private List<FlatFieldDescriptor> keyFields; - - /** - * ExpressionKeys that is defined by the full data type. - */ - public ExpressionKeys(TypeInformation<T> type) { - this(SELECT_ALL_CHAR, type); - } - - /** - * Create int-based (non-nested) field position keys on a tuple type. - */ - public ExpressionKeys(int keyPosition, TypeInformation<T> type) { - this(new int[]{keyPosition}, type, false); - } - - /** - * Create int-based (non-nested) field position keys on a tuple type. - */ - public ExpressionKeys(int[] keyPositions, TypeInformation<T> type) { - this(keyPositions, type, false); - } - - /** - * Create int-based (non-nested) field position keys on a tuple type. - */ - public ExpressionKeys(int[] keyPositions, TypeInformation<T> type, boolean allowEmpty) { - - if (!type.isTupleType() || !(type instanceof CompositeType)) { - throw new InvalidProgramException("Specifying keys via field positions is only valid " + - "for tuple data types. Type: " + type); - } - if (type.getArity() == 0) { - throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity()); - } - if (!allowEmpty && (keyPositions == null || keyPositions.length == 0)) { - throw new IllegalArgumentException("The grouping fields must not be empty."); - } - - this.keyFields = new ArrayList<>(); - - if (keyPositions == null || keyPositions.length == 0) { - // use all tuple fields as key fields - keyPositions = createIncrIntArray(type.getArity()); - } else { - rangeCheckFields(keyPositions, type.getArity() - 1); - } - Preconditions.checkArgument(keyPositions.length > 0, "Grouping fields can not be empty at this point"); - - // extract key field types - CompositeType<T> cType = (CompositeType<T>)type; - this.keyFields = new ArrayList<>(type.getTotalFields()); - - // for each key position, find all (nested) field types - String[] fieldNames = cType.getFieldNames(); - ArrayList<FlatFieldDescriptor> tmpList = new ArrayList<>(); - for (int keyPos : keyPositions) { - tmpList.clear(); - // get all flat fields - cType.getFlatFields(fieldNames[keyPos], 0, tmpList); - // check if fields are of key type - for(FlatFieldDescriptor ffd : tmpList) { - if(!ffd.getType().isKeyType()) { - throw new InvalidProgramException("This type (" + ffd.getType() + ") cannot be used as key."); - } - } - this.keyFields.addAll(tmpList); - } - } - - /** - * Create String-based (nested) field expression keys on a composite type. - */ - public ExpressionKeys(String keyExpression, TypeInformation<T> type) { - this(new String[]{keyExpression}, type); - } - - /** - * Create String-based (nested) field expression keys on a composite type. - */ - public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) { - Preconditions.checkNotNull(keyExpressions, "Field expression cannot be null."); - - this.keyFields = new ArrayList<>(keyExpressions.length); - - if (type instanceof CompositeType){ - CompositeType<T> cType = (CompositeType<T>) type; - - // extract the keys on their flat position - for (String keyExpr : keyExpressions) { - if (keyExpr == null) { - throw new InvalidProgramException("Expression key may not be null."); - } - // strip off whitespace - keyExpr = keyExpr.trim(); - - List<FlatFieldDescriptor> flatFields = cType.getFlatFields(keyExpr); - - if (flatFields.size() == 0) { - throw new InvalidProgramException("Unable to extract key from expression '" + keyExpr + "' on key " + cType); - } - // check if all nested fields can be used as keys - for (FlatFieldDescriptor field : flatFields) { - if (!field.getType().isKeyType()) { - throw new InvalidProgramException("This type (" + field.getType() + ") cannot be used as key."); - } - } - // add flat fields to key fields - keyFields.addAll(flatFields); - } - } - else { - if (!type.isKeyType()) { - throw new InvalidProgramException("This type (" + type + ") cannot be used as key."); - } - - // check that all key expressions are valid - for (String keyExpr : keyExpressions) { - if (keyExpr == null) { - throw new InvalidProgramException("Expression key may not be null."); - } - // strip off whitespace - keyExpr = keyExpr.trim(); - // check that full type is addressed - if (!(SELECT_ALL_CHAR.equals(keyExpr) || SELECT_ALL_CHAR_SCALA.equals(keyExpr))) { - throw new InvalidProgramException( - "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for non-composite types."); - } - // add full type as key - keyFields.add(new FlatFieldDescriptor(0, type)); - } - } - } - - @Override - public int getNumberOfKeyFields() { - if(keyFields == null) { - return 0; - } - return keyFields.size(); - } - - @Override - public int[] computeLogicalKeyPositions() { - int[] logicalKeys = new int[keyFields.size()]; - for (int i = 0; i < keyFields.size(); i++) { - logicalKeys[i] = keyFields.get(i).getPosition(); - } - return logicalKeys; - } - - @Override - protected TypeInformation<?>[] getKeyFieldTypes() { - TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()]; - for (int i = 0; i < keyFields.size(); i++) { - fieldTypes[i] = keyFields.get(i).getType(); - } - return fieldTypes; - } - - @Override - public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo) { - - if (keyFields.size() != 1) { - throw new InvalidProgramException("Custom partitioners can only be used with keys that have one key field."); - } - - if (typeInfo == null) { - // try to extract key type from partitioner - try { - typeInfo = TypeExtractor.getPartitionerTypes(partitioner); - } - catch (Throwable t) { - // best effort check, so we ignore exceptions - } - } - - if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo)) { - // only check type compatibility if type is known and not a generic type - - TypeInformation<?> keyType = keyFields.get(0).getType(); - if (!keyType.equals(typeInfo)) { - throw new InvalidProgramException("The partitioner is incompatible with the key type. " - + "Partitioner type: " + typeInfo + " , key type: " + keyType); - } - } - } - - @Override - public String toString() { - Joiner join = Joiner.on('.'); - return "ExpressionKeys: " + join.join(keyFields); - } - - public static boolean isSortKey(int fieldPos, TypeInformation<?> type) { - - if (!type.isTupleType() || !(type instanceof CompositeType)) { - throw new InvalidProgramException("Specifying keys via field positions is only valid " + - "for tuple data types. Type: " + type); - } - if (type.getArity() == 0) { - throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity()); - } - - if(fieldPos < 0 || fieldPos >= type.getArity()) { - throw new IndexOutOfBoundsException("Tuple position is out of range: " + fieldPos); - } - - TypeInformation<?> sortKeyType = ((CompositeType<?>)type).getTypeAt(fieldPos); - return sortKeyType.isSortKeyType(); - } - - public static boolean isSortKey(String fieldExpr, TypeInformation<?> type) { - - TypeInformation<?> sortKeyType; - - fieldExpr = fieldExpr.trim(); - if (SELECT_ALL_CHAR.equals(fieldExpr) || SELECT_ALL_CHAR_SCALA.equals(fieldExpr)) { - sortKeyType = type; - } - else { - if (type instanceof CompositeType) { - sortKeyType = ((CompositeType<?>) type).getTypeAt(fieldExpr); - } - else { - throw new InvalidProgramException( - "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for atomic types."); - } - } - - return sortKeyType.isSortKeyType(); - } - - } - - // -------------------------------------------------------------------------------------------- - - - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - - private static int[] createIncrIntArray(int numKeys) { - int[] keyFields = new int[numKeys]; - for (int i = 0; i < numKeys; i++) { - keyFields[i] = i; - } - return keyFields; - } - - private static void rangeCheckFields(int[] fields, int maxAllowedField) { - - for (int f : fields) { - if (f < 0 || f > maxAllowedField) { - throw new IndexOutOfBoundsException("Tuple position is out of range: " + f); - } - } - } - - public static class IncompatibleKeysException extends Exception { - private static final long serialVersionUID = 1L; - public static final String SIZE_MISMATCH_MESSAGE = "The number of specified keys is different."; - - public IncompatibleKeysException(String message) { - super(message); - } - - public IncompatibleKeysException(TypeInformation<?> typeInformation, TypeInformation<?> typeInformation2) { - super(typeInformation+" and "+typeInformation2+" are not compatible"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java index 1384ca2..96931b0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java @@ -21,13 +21,14 @@ package org.apache.flink.api.java.operators; import com.google.common.base.Preconditions; import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.PartitionOperatorBase; import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; +import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.tuple.Tuple2; /** @@ -150,9 +151,9 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe Partitioner<?> customPartitioner) { final SelectorFunctionKeys<T, K> keys = (SelectorFunctionKeys<T, K>) rawKeys; - TypeInformation<Tuple2<K, T>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys); + TypeInformation<Tuple2<K, T>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys); - Operator<Tuple2<K, T>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys); + Operator<Tuple2<K, T>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys); PartitionOperatorBase<Tuple2<K, T>> keyedPartitionedInput = new PartitionOperatorBase<>(new UnaryOperatorInformation<>(typeInfoWithKey, typeInfoWithKey), pMethod, new int[]{0}, name); @@ -160,7 +161,7 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe keyedPartitionedInput.setCustomPartitioner(customPartitioner); keyedPartitionedInput.setParallelism(partitionDop); - return SelectorFunctionKeys.appendKeyRemover(keyedPartitionedInput, keys); + return KeyFunctions.appendKeyRemover(keyedPartitionedInput, keys); } http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java index 6f8877f..a22a262 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java @@ -19,13 +19,14 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; +import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.DataSet; @@ -161,13 +162,13 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe @SuppressWarnings("unchecked") final SelectorFunctionKeys<T, K> keys = (SelectorFunctionKeys<T, K>) rawKeys; - TypeInformation<Tuple2<K, T>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys); - Operator<Tuple2<K, T>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys); + TypeInformation<Tuple2<K, T>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys); + Operator<Tuple2<K, T>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys); PlanUnwrappingReduceOperator<T, K> reducer = new PlanUnwrappingReduceOperator<>(function, keys, name, inputType, typeInfoWithKey); reducer.setInput(keyedInput); reducer.setParallelism(parallelism); - return SelectorFunctionKeys.appendKeyRemover(reducer, keys); + return KeyFunctions.appendKeyRemover(reducer, keys); } } http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java index d65bc68..c8a8684 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index 07d0b9a..2453f1b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; @@ -31,7 +32,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; import org.apache.flink.api.java.typeutils.TypeExtractor; import com.google.common.base.Preconditions; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java index c55d919..6144975 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.CodeAnalysisMode; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.sca.CodeAnalyzerException; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 5b0a368..4e6f6ff 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java index e14e06d..734456c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java @@ -28,7 +28,7 @@ import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin; import org.apache.flink.api.java.operators.JoinOperator.EquiJoin; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TypeExtractor; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java index fb74d1e..f620e11 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java index 2c62732..2307c0c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java index 56f34cc..30e28eb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java index c8e40ce..95b5840 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java index 72b79a4..21f15d4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java index 2aa8d54..e85bb79 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java index e52a5c4..46773fa 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java index 278d706..f0e8055 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java index 1f4b44a..93cba33 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzer.java @@ -32,8 +32,8 @@ import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.operators.Keys; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; import org.apache.flink.api.java.sca.TaggedValue.Input; import org.objectweb.asm.Type; import org.objectweb.asm.tree.MethodNode; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java deleted file mode 100644 index 5f74513..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils; - -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.List; - -/** - * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs) - * - * Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>} - * with a {@code GenericType<avro.Utf8>}. - * All other types used by Avro are standard Java types. - * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime. - * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here - * by generic type infos containing Utf8 classes (which are comparable), - * - * This class is checked by the AvroPojoTest. - * @param <T> - */ -public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> { - public AvroTypeInfo(Class<T> typeClass) { - super(typeClass, generateFieldsFromAvroSchema(typeClass)); - } - - private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) { - PojoTypeExtractor pte = new PojoTypeExtractor(); - TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList<Type>(), null, null, null); - - if(!(ti instanceof PojoTypeInfo)) { - throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); - } - PojoTypeInfo pti = (PojoTypeInfo) ti; - List<PojoField> newFields = new ArrayList<PojoField>(pti.getTotalFields()); - - for(int i = 0; i < pti.getArity(); i++) { - PojoField f = pti.getPojoFieldAt(i); - TypeInformation newType = f.getTypeInformation(); - // check if type is a CharSequence - if(newType instanceof GenericTypeInfo) { - if((newType).getTypeClass().equals(CharSequence.class)) { - // replace the type by a org.apache.avro.util.Utf8 - newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class); - } - } - PojoField newField = new PojoField(f.getField(), newType); - newFields.add(newField); - } - return newFields; - } - - private static class PojoTypeExtractor extends TypeExtractor { - private PojoTypeExtractor() { - super(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java deleted file mode 100644 index 8382831..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -/** - * This type represents a value of one two possible types, Left or Right (a - * disjoint union), inspired by Scala's Either type. - * - * @param <L> - * the type of Left - * @param <R> - * the type of Right - */ -public abstract class Either<L, R> { - - /** - * Create a Left value of Either - */ - public static <L, R> Either<L, R> Left(L value) { - return new Left<L, R>(value); - } - - /** - * Create a Right value of Either - */ - public static <L, R> Either<L, R> Right(R value) { - return new Right<L, R>(value); - } - - /** - * Retrieve the Left value of Either. - * - * @return the Left value - * @throws IllegalStateException - * if called on a Right - */ - public abstract L left() throws IllegalStateException; - - /** - * Retrieve the Right value of Either. - * - * @return the Right value - * @throws IllegalStateException - * if called on a Left - */ - public abstract R right() throws IllegalStateException; - - /** - * - * @return true if this is a Left value, false if this is a Right value - */ - public final boolean isLeft() { - return getClass() == Left.class; - } - - /** - * - * @return true if this is a Right value, false if this is a Left value - */ - public final boolean isRight() { - return getClass() == Right.class; - } - - /** - * A left value of {@link Either} - * - * @param <L> - * the type of Left - * @param <R> - * the type of Right - */ - public static class Left<L, R> extends Either<L, R> { - private final L value; - - public Left(L value) { - this.value = java.util.Objects.requireNonNull(value); - } - - @Override - public L left() { - return value; - } - - @Override - public R right() { - throw new IllegalStateException("Cannot retrieve Right value on a Left"); - } - - @Override - public boolean equals(Object object) { - if (object instanceof Left<?, ?>) { - final Left<?, ?> other = (Left<?, ?>) object; - return value.equals(other.value); - } - return false; - } - - @Override - public int hashCode() { - return value.hashCode(); - } - - @Override - public String toString() { - return "Left(" + value.toString() + ")"; - } - - /** - * Creates a left value of {@link Either} - * - */ - public static <L, R> Left<L, R> of(L left) { - return new Left<L, R>(left); - } - } - - /** - * A right value of {@link Either} - * - * @param <L> - * the type of Left - * @param <R> - * the type of Right - */ - public static class Right<L, R> extends Either<L, R> { - private final R value; - - public Right(R value) { - this.value = java.util.Objects.requireNonNull(value); - } - - @Override - public L left() { - throw new IllegalStateException("Cannot retrieve Left value on a Right"); - } - - @Override - public R right() { - return value; - } - - @Override - public boolean equals(Object object) { - if (object instanceof Right<?, ?>) { - final Right<?, ?> other = (Right<?, ?>) object; - return value.equals(other.value); - } - return false; - } - - @Override - public int hashCode() { - return value.hashCode(); - } - - @Override - public String toString() { - return "Right(" + value.toString() + ")"; - } - - /** - * Creates a right value of {@link Either} - * - */ - public static <L, R> Right<L, R> of(R right) { - return new Right<L, R>(right); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java deleted file mode 100644 index ec7be97..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; - -/** - * A {@link TypeInformation} for the {@link Either} type of the Java API. - * - * @param <L> the Left value type - * @param <R> the Right value type - */ -public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> { - - private static final long serialVersionUID = 1L; - - private final TypeInformation<L> leftType; - - private final TypeInformation<R> rightType; - - public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) { - this.leftType = leftType; - this.rightType = rightType; - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - public int getTotalFields() { - return 1; - } - - @SuppressWarnings("unchecked") - @Override - public Class<Either<L, R>> getTypeClass() { - return (Class<Either<L, R>>) (Class<?>) Either.class; - } - - @Override - public boolean isKeyType() { - return false; - } - - @Override - public TypeSerializer<Either<L, R>> createSerializer(ExecutionConfig config) { - return new EitherSerializer<L, R>(leftType.createSerializer(config), - rightType.createSerializer(config)); - } - - @Override - public String toString() { - return "Either <" + leftType.toString() + ", " + rightType.toString() + ">"; - } - - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object obj) { - if (obj instanceof EitherTypeInfo) { - EitherTypeInfo<L, R> other = (EitherTypeInfo<L, R>) obj; - - return other.canEqual(this) && - leftType.equals(other.leftType) && - rightType.equals(other.rightType); - } else { - return false; - } - } - - @Override - public int hashCode() { - return 17 * leftType.hashCode() + rightType.hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof EitherTypeInfo; - } - - // -------------------------------------------------------------------------------------------- - - public TypeInformation<L> getLeftType() { - return leftType; - } - - public TypeInformation<R> getRightType() { - return rightType; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java deleted file mode 100644 index de59c36..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.AtomicType; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.EnumComparator; -import org.apache.flink.api.common.typeutils.base.EnumSerializer; - -/** - * A {@link TypeInformation} for java enumeration types. - * - * @param <T> The type represented by this type information. - */ -public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implements AtomicType<T> { - - private static final long serialVersionUID = 8936740290137178660L; - - private final Class<T> typeClass; - - public EnumTypeInfo(Class<T> typeClass) { - Preconditions.checkNotNull(typeClass, "Enum type class must not be null."); - - if (!Enum.class.isAssignableFrom(typeClass) ) { - throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName()); - } - - this.typeClass = typeClass; - } - - @Override - public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { - return new EnumComparator<T>(sortOrderAscending); - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - public int getTotalFields() { - return 1; - } - - @Override - public Class<T> getTypeClass() { - return this.typeClass; - } - - @Override - public boolean isKeyType() { - return true; - } - - @Override - public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { - return new EnumSerializer<T>(typeClass); - } - - // ------------------------------------------------------------------------ - // Standard utils - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "EnumTypeInfo<" + typeClass.getName() + ">"; - } - - @Override - public int hashCode() { - return typeClass.hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof EnumTypeInfo; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof EnumTypeInfo) { - @SuppressWarnings("unchecked") - EnumTypeInfo<T> enumTypeInfo = (EnumTypeInfo<T>) obj; - - return enumTypeInfo.canEqual(this) && - typeClass == enumTypeInfo.typeClass; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java deleted file mode 100644 index 7e7aa68..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.AtomicType; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; - - -public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> { - - private static final long serialVersionUID = -7959114120287706504L; - - private final Class<T> typeClass; - - public GenericTypeInfo(Class<T> typeClass) { - this.typeClass = Preconditions.checkNotNull(typeClass); - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - public int getTotalFields() { - return 1; - } - - @Override - public Class<T> getTypeClass() { - return typeClass; - } - - @Override - public boolean isKeyType() { - return Comparable.class.isAssignableFrom(typeClass); - } - - @Override - public TypeSerializer<T> createSerializer(ExecutionConfig config) { - return new KryoSerializer<T>(this.typeClass, config); - } - - @SuppressWarnings("unchecked") - @Override - public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { - if (isKeyType()) { - @SuppressWarnings("rawtypes") - GenericTypeComparator comparator = new GenericTypeComparator(sortOrderAscending, createSerializer(executionConfig), this.typeClass); - return (TypeComparator<T>) comparator; - } - - throw new UnsupportedOperationException("Types that do not implement java.lang.Comparable cannot be used as keys."); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return typeClass.hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof GenericTypeInfo; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof GenericTypeInfo) { - @SuppressWarnings("unchecked") - GenericTypeInfo<T> genericTypeInfo = (GenericTypeInfo<T>) obj; - - return typeClass == genericTypeInfo.typeClass; - } else { - return false; - } - } - - @Override - public String toString() { - return "GenericType<" + typeClass.getCanonicalName() + ">"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java deleted file mode 100644 index f8b4247..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -/** - * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this interface to be configured - * with the data type they will operate on. The method {@link #setInputType(org.apache.flink.api - * .common.typeinfo.TypeInformation, org.apache.flink.api.common.ExecutionConfig)} will be - * called when the output format is used with an output method such as - * {@link org.apache.flink.api.java.DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. - */ -public interface InputTypeConfigurable { - - /** - * Method that is called on an {@link org.apache.flink.api.common.io.OutputFormat} when it is passed to - * the DataSet's output method. May be used to configures the output format based on the data type. - * - * @param type The data type of the input. - * @param executionConfig - */ - void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig); -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java deleted file mode 100644 index 1dd7f01..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -/** - * A special type information signifying that the type extraction failed. It contains - * additional error information. - */ -public class MissingTypeInfo extends TypeInformation<InvalidTypesException> { - - private static final long serialVersionUID = -4212082837126702723L; - - private final String functionName; - private final InvalidTypesException typeException; - - - public MissingTypeInfo(String functionName) { - this(functionName, new InvalidTypesException("An unknown error occured.")); - } - - public MissingTypeInfo(String functionName, InvalidTypesException typeException) { - this.functionName = functionName; - this.typeException = typeException; - } - - // -------------------------------------------------------------------------------------------- - - public String getFunctionName() { - return functionName; - } - - public InvalidTypesException getTypeException() { - return typeException; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean isBasicType() { - throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); - } - - @Override - public boolean isTupleType() { - throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); - } - - @Override - public int getArity() { - throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); - } - - @Override - public Class<InvalidTypesException> getTypeClass() { - throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); - } - - @Override - public boolean isKeyType() { - throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); - } - - @Override - public TypeSerializer<InvalidTypesException> createSerializer(ExecutionConfig executionConfig) { - throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); - } - - @Override - public String toString() { - return getClass().getSimpleName() + "<" + functionName + ", " + typeException.getMessage() + ">"; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof MissingTypeInfo) { - MissingTypeInfo missingTypeInfo = (MissingTypeInfo) obj; - - return missingTypeInfo.canEqual(this) && - functionName.equals(missingTypeInfo.functionName) && - typeException.equals(missingTypeInfo.typeException); - } else { - return false; - } - } - - @Override - public int hashCode() { - return 31 * functionName.hashCode() + typeException.hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof MissingTypeInfo; - } - - @Override - public int getTotalFields() { - throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java deleted file mode 100644 index 150c976..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import java.lang.reflect.Array; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.GenericArraySerializer; - -public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> { - - private static final long serialVersionUID = 1L; - - private final Class<T> arrayType; - private final TypeInformation<C> componentInfo; - - private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> componentInfo) { - this.arrayType = Preconditions.checkNotNull(arrayType); - this.componentInfo = Preconditions.checkNotNull(componentInfo); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - public int getTotalFields() { - return 1; - } - - @SuppressWarnings("unchecked") - @Override - public Class<T> getTypeClass() { - return arrayType; - } - - public TypeInformation<C> getComponentInfo() { - return componentInfo; - } - - @Override - public boolean isKeyType() { - return false; - } - - @SuppressWarnings("unchecked") - @Override - public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { - return (TypeSerializer<T>) new GenericArraySerializer<C>( - componentInfo.getTypeClass(), - componentInfo.createSerializer(executionConfig)); - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "<" + this.componentInfo + ">"; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof ObjectArrayTypeInfo) { - @SuppressWarnings("unchecked") - ObjectArrayTypeInfo<T, C> objectArrayTypeInfo = (ObjectArrayTypeInfo<T, C>)obj; - - return objectArrayTypeInfo.canEqual(this) && - arrayType == objectArrayTypeInfo.arrayType && - componentInfo.equals(objectArrayTypeInfo.componentInfo); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof ObjectArrayTypeInfo; - } - - @Override - public int hashCode() { - return 31 * this.arrayType.hashCode() + this.componentInfo.hashCode(); - } - - // -------------------------------------------------------------------------------------------- - - public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> arrayClass, TypeInformation<C> componentInfo) { - Preconditions.checkNotNull(arrayClass); - Preconditions.checkNotNull(componentInfo); - Preconditions.checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array."); - - return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo); - } - - /** - * Creates a new {@link org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo} from a - * {@link TypeInformation} for the component type. - * - * <p> - * This must be used in cases where the complete type of the array is not available as a - * {@link java.lang.reflect.Type} or {@link java.lang.Class}. - */ - @SuppressWarnings("unchecked") - public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(TypeInformation<C> componentInfo) { - Preconditions.checkNotNull(componentInfo); - - return new ObjectArrayTypeInfo<T, C>( - (Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(), - componentInfo); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java deleted file mode 100644 index 1b008c0..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.util.Objects; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -/** - * Represent a field definition for {@link PojoTypeInfo} type of objects. - */ -public class PojoField implements Serializable { - - private static final long serialVersionUID = 1975295846436559363L; - - private transient Field field; - private final TypeInformation<?> type; - - public PojoField(Field field, TypeInformation<?> type) { - this.field = Preconditions.checkNotNull(field); - this.type = Preconditions.checkNotNull(type); - } - - public Field getField() { - return field; - } - - public TypeInformation<?> getTypeInformation() { - return type; - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - out.writeObject(field.getDeclaringClass()); - out.writeUTF(field.getName()); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - Class<?> clazz = (Class<?>)in.readObject(); - String fieldName = in.readUTF(); - field = null; - // try superclasses as well - while (clazz != null) { - try { - field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - break; - } catch (NoSuchFieldException e) { - clazz = clazz.getSuperclass(); - } - } - if (field == null) { - throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup." - + " (" + fieldName + ")"); - } - } - - @Override - public String toString() { - return "PojoField " + field.getDeclaringClass() + "." + field.getName() + " (" + type + ")"; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PojoField) { - PojoField other = (PojoField) obj; - - return other.canEqual(this) && type.equals(other.type) && - Objects.equals(field, other.field); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Objects.hash(field, type); - } - - public boolean canEqual(Object obj) { - return obj instanceof PojoField; - } -} \ No newline at end of file