[FLINK-3303] [core] Move all type utilities to flink-core
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21a71586 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21a71586 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21a71586 Branch: refs/heads/master Commit: 21a715867d655bb61df9a9f7eef37e42b99e206a Parents: 7081836 Author: Stephan Ewen <se...@apache.org> Authored: Sun Jan 31 23:28:32 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 2 16:55:44 2016 +0100 ---------------------------------------------------------------------- flink-core/pom.xml | 47 +- .../apache/flink/api/common/operators/Keys.java | 459 +++++ .../flink/api/java/functions/KeySelector.java | 63 + .../flink/api/java/typeutils/AvroTypeInfo.java | 78 + .../api/java/typeutils/EitherTypeInfo.java | 122 ++ .../flink/api/java/typeutils/EnumTypeInfo.java | 122 ++ .../api/java/typeutils/GenericTypeInfo.java | 116 ++ .../java/typeutils/InputTypeConfigurable.java | 42 + .../api/java/typeutils/MissingTypeInfo.java | 121 ++ .../api/java/typeutils/ObjectArrayTypeInfo.java | 141 ++ .../flink/api/java/typeutils/PojoField.java | 108 + .../flink/api/java/typeutils/PojoTypeInfo.java | 406 ++++ .../api/java/typeutils/ResultTypeQueryable.java | 37 + .../flink/api/java/typeutils/TupleTypeInfo.java | 248 +++ .../api/java/typeutils/TupleTypeInfoBase.java | 252 +++ .../flink/api/java/typeutils/TypeExtractor.java | 1692 +++++++++++++++ .../api/java/typeutils/TypeInfoParser.java | 383 ++++ .../flink/api/java/typeutils/ValueTypeInfo.java | 183 ++ .../api/java/typeutils/WritableTypeInfo.java | 139 ++ .../java/typeutils/runtime/AvroSerializer.java | 201 ++ .../runtime/CopyableValueComparator.java | 167 ++ .../runtime/CopyableValueSerializer.java | 129 ++ .../typeutils/runtime/DataInputDecoder.java | 229 +++ .../typeutils/runtime/DataInputViewStream.java | 71 + .../typeutils/runtime/DataOutputEncoder.java | 190 ++ .../typeutils/runtime/DataOutputViewStream.java | 41 + .../typeutils/runtime/EitherSerializer.java | 193 ++ .../runtime/GenericTypeComparator.java | 177 ++ .../api/java/typeutils/runtime/KryoUtils.java | 87 + .../java/typeutils/runtime/NoFetchingInput.java | 141 ++ .../java/typeutils/runtime/PojoComparator.java | 354 ++++ .../java/typeutils/runtime/PojoSerializer.java | 592 ++++++ .../runtime/RuntimeComparatorFactory.java | 75 + .../runtime/RuntimePairComparatorFactory.java | 44 + .../runtime/RuntimeSerializerFactory.java | 124 ++ .../typeutils/runtime/Tuple0Serializer.java | 121 ++ .../java/typeutils/runtime/TupleComparator.java | 157 ++ .../typeutils/runtime/TupleComparatorBase.java | 279 +++ .../java/typeutils/runtime/TupleSerializer.java | 158 ++ .../typeutils/runtime/TupleSerializerBase.java | 102 + .../java/typeutils/runtime/ValueComparator.java | 183 ++ .../java/typeutils/runtime/ValueSerializer.java | 152 ++ .../typeutils/runtime/WritableComparator.java | 189 ++ .../typeutils/runtime/WritableSerializer.java | 153 ++ .../typeutils/runtime/kryo/KryoSerializer.java | 382 ++++ .../typeutils/runtime/kryo/Serializers.java | 227 ++ .../common/operators/ExpressionKeysTest.java | 481 +++++ .../operators/SelectorFunctionKeysTest.java | 154 ++ .../apache/flink/api/java/tuple/Tuple2Test.java | 44 + .../api/java/typeutils/CompositeTypeTest.java | 179 ++ .../api/java/typeutils/EitherTypeInfoTest.java | 61 + .../api/java/typeutils/EnumTypeInfoTest.java | 51 + .../api/java/typeutils/GenericTypeInfoTest.java | 47 + .../api/java/typeutils/MissingTypeInfoTest.java | 47 + .../java/typeutils/ObjectArrayTypeInfoTest.java | 58 + .../java/typeutils/PojoTypeExtractionTest.java | 812 ++++++++ .../api/java/typeutils/PojoTypeInfoTest.java | 153 ++ .../java/typeutils/PojoTypeInformationTest.java | 98 + .../api/java/typeutils/TupleTypeInfoTest.java | 96 + .../TypeExtractorInputFormatsTest.java | 231 +++ .../api/java/typeutils/TypeExtractorTest.java | 1907 +++++++++++++++++ .../api/java/typeutils/TypeInfoParserTest.java | 338 +++ .../api/java/typeutils/ValueTypeInfoTest.java | 87 + .../java/typeutils/WritableTypeInfoTest.java | 74 + .../AbstractGenericArraySerializerTest.java | 187 ++ .../AbstractGenericTypeComparatorTest.java | 376 ++++ .../AbstractGenericTypeSerializerTest.java | 364 ++++ .../runtime/AvroGenericArraySerializerTest.java | 28 + .../runtime/AvroGenericTypeComparatorTest.java | 28 + .../runtime/AvroGenericTypeSerializerTest.java | 29 + .../runtime/AvroSerializerEmptyArrayTest.java | 189 ++ .../runtime/CopyableValueComparatorTest.java | 53 + .../typeutils/runtime/EitherSerializerTest.java | 113 + .../runtime/GenericPairComparatorTest.java | 89 + .../MultidimensionalArraySerializerTest.java | 120 ++ .../typeutils/runtime/PojoComparatorTest.java | 63 + .../typeutils/runtime/PojoContainingTuple.java | 44 + .../runtime/PojoGenericTypeSerializerTest.java | 33 + .../typeutils/runtime/PojoSerializerTest.java | 243 +++ .../runtime/PojoSubclassComparatorTest.java | 76 + .../runtime/PojoSubclassSerializerTest.java | 196 ++ .../typeutils/runtime/StringArrayWritable.java | 83 + .../SubclassFromInterfaceSerializerTest.java | 171 ++ .../runtime/TestDataOutputSerializer.java | 308 +++ .../runtime/TupleComparatorILD2Test.java | 73 + .../runtime/TupleComparatorILD3Test.java | 75 + .../runtime/TupleComparatorILDC3Test.java | 75 + .../runtime/TupleComparatorILDX1Test.java | 71 + .../runtime/TupleComparatorILDXC2Test.java | 73 + .../runtime/TupleComparatorISD1Test.java | 69 + .../runtime/TupleComparatorISD2Test.java | 73 + .../runtime/TupleComparatorISD3Test.java | 75 + .../runtime/TupleComparatorTTT1Test.java | 139 ++ .../runtime/TupleComparatorTTT2Test.java | 145 ++ .../runtime/TupleComparatorTTT3Test.java | 154 ++ .../typeutils/runtime/TupleSerializerTest.java | 238 +++ .../runtime/TupleSerializerTestInstance.java | 79 + .../typeutils/runtime/ValueComparatorTest.java | 53 + .../runtime/ValueComparatorUUIDTest.java | 46 + .../api/java/typeutils/runtime/ValueID.java | 72 + .../runtime/ValueSerializerUUIDTest.java | 50 + .../runtime/WritableComparatorTest.java | 53 + .../runtime/WritableComparatorUUIDTest.java | 46 + .../api/java/typeutils/runtime/WritableID.java | 78 + .../runtime/WritableSerializerTest.java | 50 + .../runtime/WritableSerializerUUIDTest.java | 50 + .../runtime/kryo/KryoClearedBufferTest.java | 287 +++ .../kryo/KryoGenericArraySerializerTest.java | 30 + .../kryo/KryoGenericTypeComparatorTest.java | 30 + .../kryo/KryoGenericTypeSerializerTest.java | 168 ++ .../kryo/KryoWithCustomSerializersTest.java | 75 + .../typeutils/runtime/kryo/SerializersTest.java | 103 + .../tuple/base/TupleComparatorTestBase.java | 43 + .../tuple/base/TuplePairComparatorTestBase.java | 109 + flink-java/pom.xml | 32 +- .../java/org/apache/flink/api/java/DataSet.java | 2 +- .../java/org/apache/flink/api/java/Utils.java | 19 - .../flink/api/java/functions/KeySelector.java | 63 - .../api/java/functions/SemanticPropUtil.java | 2 +- .../flink/api/java/io/SplitDataProperties.java | 2 +- .../api/java/operators/AggregateOperator.java | 1 + .../api/java/operators/CoGroupOperator.java | 23 +- .../api/java/operators/CoGroupRawOperator.java | 3 +- .../flink/api/java/operators/DataSink.java | 1 + .../api/java/operators/DeltaIteration.java | 1 + .../java/operators/DeltaIterationResultSet.java | 1 + .../api/java/operators/DistinctOperator.java | 7 +- .../java/operators/GroupCombineOperator.java | 11 +- .../api/java/operators/GroupReduceOperator.java | 11 +- .../flink/api/java/operators/Grouping.java | 1 + .../flink/api/java/operators/JoinOperator.java | 15 +- .../flink/api/java/operators/KeyFunctions.java | 119 ++ .../apache/flink/api/java/operators/Keys.java | 550 ----- .../api/java/operators/PartitionOperator.java | 9 +- .../api/java/operators/ReduceOperator.java | 9 +- .../java/operators/SortPartitionOperator.java | 1 + .../api/java/operators/SortedGrouping.java | 3 +- .../api/java/operators/UdfOperatorUtils.java | 1 + .../api/java/operators/UnsortedGrouping.java | 1 + .../operators/join/JoinOperatorSetsBase.java | 2 +- .../PlanBothUnwrappingCoGroupOperator.java | 2 +- .../PlanLeftUnwrappingCoGroupOperator.java | 2 +- .../PlanRightUnwrappingCoGroupOperator.java | 2 +- .../PlanUnwrappingGroupCombineOperator.java | 2 +- .../PlanUnwrappingReduceGroupOperator.java | 2 +- .../PlanUnwrappingReduceOperator.java | 2 +- ...lanUnwrappingSortedGroupCombineOperator.java | 2 +- ...PlanUnwrappingSortedReduceGroupOperator.java | 2 +- .../apache/flink/api/java/sca/UdfAnalyzer.java | 4 +- .../flink/api/java/typeutils/AvroTypeInfo.java | 78 - .../apache/flink/api/java/typeutils/Either.java | 185 -- .../api/java/typeutils/EitherTypeInfo.java | 121 -- .../flink/api/java/typeutils/EnumTypeInfo.java | 122 -- .../api/java/typeutils/GenericTypeInfo.java | 116 -- .../java/typeutils/InputTypeConfigurable.java | 42 - .../api/java/typeutils/MissingTypeInfo.java | 121 -- .../api/java/typeutils/ObjectArrayTypeInfo.java | 141 -- .../flink/api/java/typeutils/PojoField.java | 108 - .../flink/api/java/typeutils/PojoTypeInfo.java | 405 ---- .../api/java/typeutils/ResultTypeQueryable.java | 37 - .../flink/api/java/typeutils/TupleTypeInfo.java | 248 --- .../api/java/typeutils/TupleTypeInfoBase.java | 251 --- .../flink/api/java/typeutils/TypeExtractor.java | 1687 --------------- .../api/java/typeutils/TypeInfoParser.java | 383 ---- .../flink/api/java/typeutils/ValueTypeInfo.java | 183 -- .../api/java/typeutils/WritableTypeInfo.java | 139 -- .../java/typeutils/runtime/AvroSerializer.java | 201 -- .../runtime/CopyableValueComparator.java | 167 -- .../runtime/CopyableValueSerializer.java | 129 -- .../typeutils/runtime/DataInputDecoder.java | 229 --- .../typeutils/runtime/DataInputViewStream.java | 71 - .../typeutils/runtime/DataOutputEncoder.java | 190 -- .../typeutils/runtime/DataOutputViewStream.java | 41 - .../typeutils/runtime/EitherSerializer.java | 193 -- .../runtime/GenericTypeComparator.java | 177 -- .../api/java/typeutils/runtime/KryoUtils.java | 87 - .../java/typeutils/runtime/NoFetchingInput.java | 141 -- .../java/typeutils/runtime/PojoComparator.java | 354 ---- .../java/typeutils/runtime/PojoSerializer.java | 592 ------ .../runtime/RuntimeComparatorFactory.java | 75 - .../runtime/RuntimePairComparatorFactory.java | 44 - .../runtime/RuntimeSerializerFactory.java | 124 -- .../typeutils/runtime/Tuple0Serializer.java | 121 -- .../java/typeutils/runtime/TupleComparator.java | 157 -- .../typeutils/runtime/TupleComparatorBase.java | 279 --- .../java/typeutils/runtime/TupleSerializer.java | 158 -- .../typeutils/runtime/TupleSerializerBase.java | 102 - .../java/typeutils/runtime/ValueComparator.java | 183 -- .../java/typeutils/runtime/ValueSerializer.java | 152 -- .../typeutils/runtime/WritableComparator.java | 189 -- .../typeutils/runtime/WritableSerializer.java | 153 -- .../typeutils/runtime/kryo/KryoSerializer.java | 382 ---- .../typeutils/runtime/kryo/Serializers.java | 229 --- .../flink/api/java/TypeExtractionTest.java | 117 ++ .../api/java/operators/ExpressionKeysTest.java | 479 ----- .../flink/api/java/operators/NamesTest.java | 4 +- .../operators/SelectorFunctionKeysTest.java | 153 -- .../flink/api/java/sca/UdfAnalyzerTest.java | 2 +- .../apache/flink/api/java/tuple/Tuple2Test.java | 45 - .../type/extractor/PojoTypeExtractionTest.java | 876 -------- .../type/extractor/PojoTypeInformationTest.java | 98 - .../TypeExtractorInputFormatsTest.java | 234 --- .../java/type/extractor/TypeExtractorTest.java | 1931 ------------------ .../api/java/typeutils/CompositeTypeTest.java | 179 -- .../api/java/typeutils/EitherTypeInfoTest.java | 60 - .../api/java/typeutils/EnumTypeInfoTest.java | 51 - .../api/java/typeutils/GenericTypeInfoTest.java | 47 - .../api/java/typeutils/MissingTypeInfoTest.java | 47 - .../java/typeutils/ObjectArrayTypeInfoTest.java | 58 - .../api/java/typeutils/PojoTypeInfoTest.java | 153 -- .../api/java/typeutils/TupleTypeInfoTest.java | 96 - .../api/java/typeutils/TypeInfoParserTest.java | 338 --- .../api/java/typeutils/ValueTypeInfoTest.java | 87 - .../java/typeutils/WritableTypeInfoTest.java | 74 - .../AbstractGenericArraySerializerTest.java | 187 -- .../AbstractGenericTypeComparatorTest.java | 376 ---- .../AbstractGenericTypeSerializerTest.java | 364 ---- .../runtime/AvroGenericArraySerializerTest.java | 28 - .../runtime/AvroGenericTypeComparatorTest.java | 28 - .../runtime/AvroGenericTypeSerializerTest.java | 29 - .../runtime/AvroSerializerEmptyArrayTest.java | 189 -- .../runtime/CopyableValueComparatorTest.java | 53 - .../typeutils/runtime/EitherSerializerTest.java | 113 - .../runtime/GenericPairComparatorTest.java | 89 - .../MultidimensionalArraySerializerTest.java | 120 -- .../typeutils/runtime/PojoComparatorTest.java | 63 - .../typeutils/runtime/PojoContainingTuple.java | 44 - .../runtime/PojoGenericTypeSerializerTest.java | 33 - .../typeutils/runtime/PojoSerializerTest.java | 243 --- .../runtime/PojoSubclassComparatorTest.java | 76 - .../runtime/PojoSubclassSerializerTest.java | 196 -- .../typeutils/runtime/StringArrayWritable.java | 83 - .../SubclassFromInterfaceSerializerTest.java | 171 -- .../runtime/TestDataOutputSerializer.java | 308 --- .../runtime/TupleComparatorILD2Test.java | 73 - .../runtime/TupleComparatorILD3Test.java | 75 - .../runtime/TupleComparatorILDC3Test.java | 75 - .../runtime/TupleComparatorILDX1Test.java | 71 - .../runtime/TupleComparatorILDXC2Test.java | 73 - .../runtime/TupleComparatorISD1Test.java | 69 - .../runtime/TupleComparatorISD2Test.java | 73 - .../runtime/TupleComparatorISD3Test.java | 75 - .../runtime/TupleComparatorTTT1Test.java | 139 -- .../runtime/TupleComparatorTTT2Test.java | 145 -- .../runtime/TupleComparatorTTT3Test.java | 154 -- .../typeutils/runtime/TupleSerializerTest.java | 238 --- .../runtime/TupleSerializerTestInstance.java | 79 - .../typeutils/runtime/ValueComparatorTest.java | 53 - .../runtime/ValueComparatorUUIDTest.java | 46 - .../api/java/typeutils/runtime/ValueID.java | 72 - .../runtime/ValueSerializerUUIDTest.java | 50 - .../runtime/WritableComparatorTest.java | 53 - .../runtime/WritableComparatorUUIDTest.java | 46 - .../api/java/typeutils/runtime/WritableID.java | 78 - .../runtime/WritableSerializerTest.java | 50 - .../runtime/WritableSerializerUUIDTest.java | 50 - .../runtime/kryo/KryoClearedBufferTest.java | 287 --- .../kryo/KryoGenericArraySerializerTest.java | 30 - .../kryo/KryoGenericTypeComparatorTest.java | 30 - .../kryo/KryoGenericTypeSerializerTest.java | 168 -- .../kryo/KryoWithCustomSerializersTest.java | 75 - .../typeutils/runtime/kryo/SerializersTest.java | 103 - .../tuple/base/TupleComparatorTestBase.java | 43 - .../tuple/base/TuplePairComparatorTestBase.java | 109 - .../flink/python/api/PythonPlanBinder.java | 2 +- .../api/java/table/JavaBatchTranslator.scala | 5 +- .../scala/operators/ScalaAggregateOperator.java | 2 +- .../apache/flink/api/scala/CoGroupDataSet.scala | 4 +- .../org/apache/flink/api/scala/DataSet.scala | 21 +- .../apache/flink/api/scala/GroupedDataSet.scala | 4 +- .../api/scala/UnfinishedCoGroupOperation.scala | 2 + .../apache/flink/api/scala/joinDataSet.scala | 3 + .../api/scala/typeutils/CaseClassTypeInfo.scala | 3 +- .../api/scala/unfinishedKeyPairOperation.scala | 4 +- .../streaming/api/datastream/DataStream.java | 2 +- .../streaming/util/keys/KeySelectorUtil.java | 2 +- .../streaming/api/AggregationFunctionTest.java | 2 +- .../scala/operators/CoGroupOperatorTest.scala | 3 +- .../scala/operators/GroupCombineITCase.scala | 10 +- .../api/scala/operators/JoinOperatorTest.scala | 3 +- .../scala/types/TypeInformationGenTest.scala | 4 +- 281 files changed, 20419 insertions(+), 20528 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index adc9a9b..ba1050c 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -40,12 +40,6 @@ under the License. <artifactId>flink-annotations</artifactId> <version>${project.version}</version> </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - <version>${project.version}</version> - </dependency> <dependency> <groupId>commons-collections</groupId> @@ -56,20 +50,61 @@ under the License. <dependency> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> + <!-- managed version --> </dependency> + <!-- Avro is needed for the interoperability with Avro types for serialization --> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <!-- managed version --> + <exclusions> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Hadoop is only needed here for serialization interoperability with the Writable type --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> + <!-- test depedencies --> + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.5</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.joda</groupId> + <artifactId>joda-convert</artifactId> + <version>1.7</version> + <scope>test</scope> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java new file mode 100644 index 0000000..6d681de --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.base.Joiner; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import com.google.common.base.Preconditions; + +public abstract class Keys<T> { + + public abstract int getNumberOfKeyFields(); + + public abstract int[] computeLogicalKeyPositions(); + + protected abstract TypeInformation<?>[] getKeyFieldTypes(); + + public abstract <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo); + + public boolean isEmpty() { + return getNumberOfKeyFields() == 0; + } + + /** + * Check if two sets of keys are compatible to each other (matching types, key counts) + */ + public boolean areCompatible(Keys<?> other) throws IncompatibleKeysException { + + TypeInformation<?>[] thisKeyFieldTypes = this.getKeyFieldTypes(); + TypeInformation<?>[] otherKeyFieldTypes = other.getKeyFieldTypes(); + + if (thisKeyFieldTypes.length != otherKeyFieldTypes.length) { + throw new IncompatibleKeysException(IncompatibleKeysException.SIZE_MISMATCH_MESSAGE); + } else { + for (int i = 0; i < thisKeyFieldTypes.length; i++) { + if (!thisKeyFieldTypes[i].equals(otherKeyFieldTypes[i])) { + throw new IncompatibleKeysException(thisKeyFieldTypes[i], otherKeyFieldTypes[i] ); + } + } + } + return true; + } + + // -------------------------------------------------------------------------------------------- + // Specializations for expression-based / extractor-based grouping + // -------------------------------------------------------------------------------------------- + + + public static class SelectorFunctionKeys<T, K> extends Keys<T> { + + private final KeySelector<T, K> keyExtractor; + private final TypeInformation<T> inputType; + private final TypeInformation<K> keyType; + private final List<FlatFieldDescriptor> keyFields; + + public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> inputType, TypeInformation<K> keyType) { + + if (keyExtractor == null) { + throw new NullPointerException("Key extractor must not be null."); + } + if (keyType == null) { + throw new NullPointerException("Key type must not be null."); + } + if (!keyType.isKeyType()) { + throw new InvalidProgramException("Return type "+keyType+" of KeySelector "+keyExtractor.getClass()+" is not a valid key type"); + } + + this.keyExtractor = keyExtractor; + this.inputType = inputType; + this.keyType = keyType; + + if (keyType instanceof CompositeType) { + this.keyFields = ((CompositeType<T>)keyType).getFlatFields(ExpressionKeys.SELECT_ALL_CHAR); + } + else { + this.keyFields = new ArrayList<>(1); + this.keyFields.add(new FlatFieldDescriptor(0, keyType)); + } + } + + public TypeInformation<K> getKeyType() { + return keyType; + } + + public TypeInformation<T> getInputType() { + return inputType; + } + + public KeySelector<T, K> getKeyExtractor() { + return keyExtractor; + } + + @Override + public int getNumberOfKeyFields() { + return keyFields.size(); + } + + @Override + public int[] computeLogicalKeyPositions() { + int[] logicalKeys = new int[keyFields.size()]; + for (int i = 0; i < keyFields.size(); i++) { + logicalKeys[i] = keyFields.get(i).getPosition(); + } + return logicalKeys; + } + + @Override + protected TypeInformation<?>[] getKeyFieldTypes() { + TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()]; + for (int i = 0; i < keyFields.size(); i++) { + fieldTypes[i] = keyFields.get(i).getType(); + } + return fieldTypes; + } + + @Override + public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo) { + + if (keyFields.size() != 1) { + throw new InvalidProgramException("Custom partitioners can only be used with keys that have one key field."); + } + + if (typeInfo == null) { + // try to extract key type from partitioner + try { + typeInfo = TypeExtractor.getPartitionerTypes(partitioner); + } + catch (Throwable t) { + // best effort check, so we ignore exceptions + } + } + + // only check if type is known and not a generic type + if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo)) { + // check equality of key and partitioner type + if (!keyType.equals(typeInfo)) { + throw new InvalidProgramException("The partitioner is incompatible with the key type. " + + "Partitioner type: " + typeInfo + " , key type: " + keyType); + } + } + } + + @Override + public String toString() { + return "Key function (Type: " + keyType + ")"; + } + } + + + /** + * Represents (nested) field access through string and integer-based keys + */ + public static class ExpressionKeys<T> extends Keys<T> { + + public static final String SELECT_ALL_CHAR = "*"; + public static final String SELECT_ALL_CHAR_SCALA = "_"; + + // Flattened fields representing keys fields + private List<FlatFieldDescriptor> keyFields; + + /** + * ExpressionKeys that is defined by the full data type. + */ + public ExpressionKeys(TypeInformation<T> type) { + this(SELECT_ALL_CHAR, type); + } + + /** + * Create int-based (non-nested) field position keys on a tuple type. + */ + public ExpressionKeys(int keyPosition, TypeInformation<T> type) { + this(new int[]{keyPosition}, type, false); + } + + /** + * Create int-based (non-nested) field position keys on a tuple type. + */ + public ExpressionKeys(int[] keyPositions, TypeInformation<T> type) { + this(keyPositions, type, false); + } + + /** + * Create int-based (non-nested) field position keys on a tuple type. + */ + public ExpressionKeys(int[] keyPositions, TypeInformation<T> type, boolean allowEmpty) { + + if (!type.isTupleType() || !(type instanceof CompositeType)) { + throw new InvalidProgramException("Specifying keys via field positions is only valid " + + "for tuple data types. Type: " + type); + } + if (type.getArity() == 0) { + throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity()); + } + if (!allowEmpty && (keyPositions == null || keyPositions.length == 0)) { + throw new IllegalArgumentException("The grouping fields must not be empty."); + } + + this.keyFields = new ArrayList<>(); + + if (keyPositions == null || keyPositions.length == 0) { + // use all tuple fields as key fields + keyPositions = createIncrIntArray(type.getArity()); + } else { + rangeCheckFields(keyPositions, type.getArity() - 1); + } + Preconditions.checkArgument(keyPositions.length > 0, "Grouping fields can not be empty at this point"); + + // extract key field types + CompositeType<T> cType = (CompositeType<T>)type; + this.keyFields = new ArrayList<>(type.getTotalFields()); + + // for each key position, find all (nested) field types + String[] fieldNames = cType.getFieldNames(); + ArrayList<FlatFieldDescriptor> tmpList = new ArrayList<>(); + for (int keyPos : keyPositions) { + tmpList.clear(); + // get all flat fields + cType.getFlatFields(fieldNames[keyPos], 0, tmpList); + // check if fields are of key type + for(FlatFieldDescriptor ffd : tmpList) { + if(!ffd.getType().isKeyType()) { + throw new InvalidProgramException("This type (" + ffd.getType() + ") cannot be used as key."); + } + } + this.keyFields.addAll(tmpList); + } + } + + /** + * Create String-based (nested) field expression keys on a composite type. + */ + public ExpressionKeys(String keyExpression, TypeInformation<T> type) { + this(new String[]{keyExpression}, type); + } + + /** + * Create String-based (nested) field expression keys on a composite type. + */ + public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) { + Preconditions.checkNotNull(keyExpressions, "Field expression cannot be null."); + + this.keyFields = new ArrayList<>(keyExpressions.length); + + if (type instanceof CompositeType){ + CompositeType<T> cType = (CompositeType<T>) type; + + // extract the keys on their flat position + for (String keyExpr : keyExpressions) { + if (keyExpr == null) { + throw new InvalidProgramException("Expression key may not be null."); + } + // strip off whitespace + keyExpr = keyExpr.trim(); + + List<FlatFieldDescriptor> flatFields = cType.getFlatFields(keyExpr); + + if (flatFields.size() == 0) { + throw new InvalidProgramException("Unable to extract key from expression '" + keyExpr + "' on key " + cType); + } + // check if all nested fields can be used as keys + for (FlatFieldDescriptor field : flatFields) { + if (!field.getType().isKeyType()) { + throw new InvalidProgramException("This type (" + field.getType() + ") cannot be used as key."); + } + } + // add flat fields to key fields + keyFields.addAll(flatFields); + } + } + else { + if (!type.isKeyType()) { + throw new InvalidProgramException("This type (" + type + ") cannot be used as key."); + } + + // check that all key expressions are valid + for (String keyExpr : keyExpressions) { + if (keyExpr == null) { + throw new InvalidProgramException("Expression key may not be null."); + } + // strip off whitespace + keyExpr = keyExpr.trim(); + // check that full type is addressed + if (!(SELECT_ALL_CHAR.equals(keyExpr) || SELECT_ALL_CHAR_SCALA.equals(keyExpr))) { + throw new InvalidProgramException( + "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for non-composite types."); + } + // add full type as key + keyFields.add(new FlatFieldDescriptor(0, type)); + } + } + } + + @Override + public int getNumberOfKeyFields() { + if(keyFields == null) { + return 0; + } + return keyFields.size(); + } + + @Override + public int[] computeLogicalKeyPositions() { + int[] logicalKeys = new int[keyFields.size()]; + for (int i = 0; i < keyFields.size(); i++) { + logicalKeys[i] = keyFields.get(i).getPosition(); + } + return logicalKeys; + } + + @Override + protected TypeInformation<?>[] getKeyFieldTypes() { + TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()]; + for (int i = 0; i < keyFields.size(); i++) { + fieldTypes[i] = keyFields.get(i).getType(); + } + return fieldTypes; + } + + @Override + public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo) { + + if (keyFields.size() != 1) { + throw new InvalidProgramException("Custom partitioners can only be used with keys that have one key field."); + } + + if (typeInfo == null) { + // try to extract key type from partitioner + try { + typeInfo = TypeExtractor.getPartitionerTypes(partitioner); + } + catch (Throwable t) { + // best effort check, so we ignore exceptions + } + } + + if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo)) { + // only check type compatibility if type is known and not a generic type + + TypeInformation<?> keyType = keyFields.get(0).getType(); + if (!keyType.equals(typeInfo)) { + throw new InvalidProgramException("The partitioner is incompatible with the key type. " + + "Partitioner type: " + typeInfo + " , key type: " + keyType); + } + } + } + + @Override + public String toString() { + Joiner join = Joiner.on('.'); + return "ExpressionKeys: " + join.join(keyFields); + } + + public static boolean isSortKey(int fieldPos, TypeInformation<?> type) { + + if (!type.isTupleType() || !(type instanceof CompositeType)) { + throw new InvalidProgramException("Specifying keys via field positions is only valid " + + "for tuple data types. Type: " + type); + } + if (type.getArity() == 0) { + throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity()); + } + + if(fieldPos < 0 || fieldPos >= type.getArity()) { + throw new IndexOutOfBoundsException("Tuple position is out of range: " + fieldPos); + } + + TypeInformation<?> sortKeyType = ((CompositeType<?>)type).getTypeAt(fieldPos); + return sortKeyType.isSortKeyType(); + } + + public static boolean isSortKey(String fieldExpr, TypeInformation<?> type) { + + TypeInformation<?> sortKeyType; + + fieldExpr = fieldExpr.trim(); + if (SELECT_ALL_CHAR.equals(fieldExpr) || SELECT_ALL_CHAR_SCALA.equals(fieldExpr)) { + sortKeyType = type; + } + else { + if (type instanceof CompositeType) { + sortKeyType = ((CompositeType<?>) type).getTypeAt(fieldExpr); + } + else { + throw new InvalidProgramException( + "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for atomic types."); + } + } + + return sortKeyType.isSortKeyType(); + } + + } + + // -------------------------------------------------------------------------------------------- + + + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + + private static int[] createIncrIntArray(int numKeys) { + int[] keyFields = new int[numKeys]; + for (int i = 0; i < numKeys; i++) { + keyFields[i] = i; + } + return keyFields; + } + + private static void rangeCheckFields(int[] fields, int maxAllowedField) { + + for (int f : fields) { + if (f < 0 || f > maxAllowedField) { + throw new IndexOutOfBoundsException("Tuple position is out of range: " + f); + } + } + } + + public static class IncompatibleKeysException extends Exception { + private static final long serialVersionUID = 1L; + public static final String SIZE_MISMATCH_MESSAGE = "The number of specified keys is different."; + + public IncompatibleKeysException(String message) { + super(message); + } + + public IncompatibleKeysException(TypeInformation<?> typeInformation, TypeInformation<?> typeInformation2) { + super(typeInformation+" and "+typeInformation2+" are not compatible"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java new file mode 100644 index 0000000..3d06c59 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * The {@link KeySelector} allows to use arbitrary objects for operations such as + * reduce, reduceGroup, join, coGoup, etc. + * + * The extractor takes an object and returns the key for that object. + * + * @param <IN> Type of objects to extract the key from. + * @param <KEY> Type of key. + */ +public interface KeySelector<IN, KEY> extends Function, Serializable { + + /** + * User-defined function that extracts the key from an arbitrary object. + * + * For example for a class: + * <pre> + * public class Word { + * String word; + * int count; + * } + * </pre> + * The key extractor could return the word as + * a key to group all Word objects by the String they contain. + * + * The code would look like this + * <pre> + * public String getKey(Word w) { + * return w.word; + * } + * </pre> + * + * @param value The object to get the key from. + * @return The extracted key. + * + * @throws Exception Throwing an exception will cause the execution of the respective task to fail, + * and trigger recovery or cancellation of the program. + */ + KEY getKey(IN value) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java new file mode 100644 index 0000000..5f74513 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils; + +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +/** + * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs) + * + * Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>} + * with a {@code GenericType<avro.Utf8>}. + * All other types used by Avro are standard Java types. + * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime. + * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here + * by generic type infos containing Utf8 classes (which are comparable), + * + * This class is checked by the AvroPojoTest. + * @param <T> + */ +public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> { + public AvroTypeInfo(Class<T> typeClass) { + super(typeClass, generateFieldsFromAvroSchema(typeClass)); + } + + private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) { + PojoTypeExtractor pte = new PojoTypeExtractor(); + TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList<Type>(), null, null, null); + + if(!(ti instanceof PojoTypeInfo)) { + throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); + } + PojoTypeInfo pti = (PojoTypeInfo) ti; + List<PojoField> newFields = new ArrayList<PojoField>(pti.getTotalFields()); + + for(int i = 0; i < pti.getArity(); i++) { + PojoField f = pti.getPojoFieldAt(i); + TypeInformation newType = f.getTypeInformation(); + // check if type is a CharSequence + if(newType instanceof GenericTypeInfo) { + if((newType).getTypeClass().equals(CharSequence.class)) { + // replace the type by a org.apache.avro.util.Utf8 + newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class); + } + } + PojoField newField = new PojoField(f.getField(), newType); + newFields.add(newField); + } + return newFields; + } + + private static class PojoTypeExtractor extends TypeExtractor { + private PojoTypeExtractor() { + super(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java new file mode 100644 index 0000000..74d850b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; +import org.apache.flink.types.Either; + +/** + * A {@link TypeInformation} for the {@link Either} type of the Java API. + * + * @param <L> the Left value type + * @param <R> the Right value type + */ +public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> { + + private static final long serialVersionUID = 1L; + + private final TypeInformation<L> leftType; + + private final TypeInformation<R> rightType; + + public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) { + this.leftType = leftType; + this.rightType = rightType; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @SuppressWarnings("unchecked") + @Override + public Class<Either<L, R>> getTypeClass() { + return (Class<Either<L, R>>) (Class<?>) Either.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer<Either<L, R>> createSerializer(ExecutionConfig config) { + return new EitherSerializer<L, R>(leftType.createSerializer(config), + rightType.createSerializer(config)); + } + + @Override + public String toString() { + return "Either <" + leftType.toString() + ", " + rightType.toString() + ">"; + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object obj) { + if (obj instanceof EitherTypeInfo) { + EitherTypeInfo<L, R> other = (EitherTypeInfo<L, R>) obj; + + return other.canEqual(this) && + leftType.equals(other.leftType) && + rightType.equals(other.rightType); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 17 * leftType.hashCode() + rightType.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof EitherTypeInfo; + } + + // -------------------------------------------------------------------------------------------- + + public TypeInformation<L> getLeftType() { + return leftType; + } + + public TypeInformation<R> getRightType() { + return rightType; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java new file mode 100644 index 0000000..de59c36 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.EnumComparator; +import org.apache.flink.api.common.typeutils.base.EnumSerializer; + +/** + * A {@link TypeInformation} for java enumeration types. + * + * @param <T> The type represented by this type information. + */ +public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implements AtomicType<T> { + + private static final long serialVersionUID = 8936740290137178660L; + + private final Class<T> typeClass; + + public EnumTypeInfo(Class<T> typeClass) { + Preconditions.checkNotNull(typeClass, "Enum type class must not be null."); + + if (!Enum.class.isAssignableFrom(typeClass) ) { + throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName()); + } + + this.typeClass = typeClass; + } + + @Override + public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { + return new EnumComparator<T>(sortOrderAscending); + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class<T> getTypeClass() { + return this.typeClass; + } + + @Override + public boolean isKeyType() { + return true; + } + + @Override + public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { + return new EnumSerializer<T>(typeClass); + } + + // ------------------------------------------------------------------------ + // Standard utils + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "EnumTypeInfo<" + typeClass.getName() + ">"; + } + + @Override + public int hashCode() { + return typeClass.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof EnumTypeInfo; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof EnumTypeInfo) { + @SuppressWarnings("unchecked") + EnumTypeInfo<T> enumTypeInfo = (EnumTypeInfo<T>) obj; + + return enumTypeInfo.canEqual(this) && + typeClass == enumTypeInfo.typeClass; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java new file mode 100644 index 0000000..7e7aa68 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; + + +public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> { + + private static final long serialVersionUID = -7959114120287706504L; + + private final Class<T> typeClass; + + public GenericTypeInfo(Class<T> typeClass) { + this.typeClass = Preconditions.checkNotNull(typeClass); + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class<T> getTypeClass() { + return typeClass; + } + + @Override + public boolean isKeyType() { + return Comparable.class.isAssignableFrom(typeClass); + } + + @Override + public TypeSerializer<T> createSerializer(ExecutionConfig config) { + return new KryoSerializer<T>(this.typeClass, config); + } + + @SuppressWarnings("unchecked") + @Override + public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { + if (isKeyType()) { + @SuppressWarnings("rawtypes") + GenericTypeComparator comparator = new GenericTypeComparator(sortOrderAscending, createSerializer(executionConfig), this.typeClass); + return (TypeComparator<T>) comparator; + } + + throw new UnsupportedOperationException("Types that do not implement java.lang.Comparable cannot be used as keys."); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return typeClass.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof GenericTypeInfo; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof GenericTypeInfo) { + @SuppressWarnings("unchecked") + GenericTypeInfo<T> genericTypeInfo = (GenericTypeInfo<T>) obj; + + return typeClass == genericTypeInfo.typeClass; + } else { + return false; + } + } + + @Override + public String toString() { + return "GenericType<" + typeClass.getCanonicalName() + ">"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java new file mode 100644 index 0000000..f8b4247 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this interface to be configured + * with the data type they will operate on. The method {@link #setInputType(org.apache.flink.api + * .common.typeinfo.TypeInformation, org.apache.flink.api.common.ExecutionConfig)} will be + * called when the output format is used with an output method such as + * {@link org.apache.flink.api.java.DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. + */ +public interface InputTypeConfigurable { + + /** + * Method that is called on an {@link org.apache.flink.api.common.io.OutputFormat} when it is passed to + * the DataSet's output method. May be used to configures the output format based on the data type. + * + * @param type The data type of the input. + * @param executionConfig + */ + void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig); +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java new file mode 100644 index 0000000..1dd7f01 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * A special type information signifying that the type extraction failed. It contains + * additional error information. + */ +public class MissingTypeInfo extends TypeInformation<InvalidTypesException> { + + private static final long serialVersionUID = -4212082837126702723L; + + private final String functionName; + private final InvalidTypesException typeException; + + + public MissingTypeInfo(String functionName) { + this(functionName, new InvalidTypesException("An unknown error occured.")); + } + + public MissingTypeInfo(String functionName, InvalidTypesException typeException) { + this.functionName = functionName; + this.typeException = typeException; + } + + // -------------------------------------------------------------------------------------------- + + public String getFunctionName() { + return functionName; + } + + public InvalidTypesException getTypeException() { + return typeException; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public boolean isBasicType() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public boolean isTupleType() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public int getArity() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public Class<InvalidTypesException> getTypeClass() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public boolean isKeyType() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public TypeSerializer<InvalidTypesException> createSerializer(ExecutionConfig executionConfig) { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "<" + functionName + ", " + typeException.getMessage() + ">"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof MissingTypeInfo) { + MissingTypeInfo missingTypeInfo = (MissingTypeInfo) obj; + + return missingTypeInfo.canEqual(this) && + functionName.equals(missingTypeInfo.functionName) && + typeException.equals(missingTypeInfo.typeException); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * functionName.hashCode() + typeException.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof MissingTypeInfo; + } + + @Override + public int getTotalFields() { + throw new UnsupportedOperationException("The missing type information cannot be used as a type information."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java new file mode 100644 index 0000000..150c976 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.lang.reflect.Array; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.GenericArraySerializer; + +public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> { + + private static final long serialVersionUID = 1L; + + private final Class<T> arrayType; + private final TypeInformation<C> componentInfo; + + private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> componentInfo) { + this.arrayType = Preconditions.checkNotNull(arrayType); + this.componentInfo = Preconditions.checkNotNull(componentInfo); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @SuppressWarnings("unchecked") + @Override + public Class<T> getTypeClass() { + return arrayType; + } + + public TypeInformation<C> getComponentInfo() { + return componentInfo; + } + + @Override + public boolean isKeyType() { + return false; + } + + @SuppressWarnings("unchecked") + @Override + public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { + return (TypeSerializer<T>) new GenericArraySerializer<C>( + componentInfo.getTypeClass(), + componentInfo.createSerializer(executionConfig)); + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "<" + this.componentInfo + ">"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ObjectArrayTypeInfo) { + @SuppressWarnings("unchecked") + ObjectArrayTypeInfo<T, C> objectArrayTypeInfo = (ObjectArrayTypeInfo<T, C>)obj; + + return objectArrayTypeInfo.canEqual(this) && + arrayType == objectArrayTypeInfo.arrayType && + componentInfo.equals(objectArrayTypeInfo.componentInfo); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof ObjectArrayTypeInfo; + } + + @Override + public int hashCode() { + return 31 * this.arrayType.hashCode() + this.componentInfo.hashCode(); + } + + // -------------------------------------------------------------------------------------------- + + public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> arrayClass, TypeInformation<C> componentInfo) { + Preconditions.checkNotNull(arrayClass); + Preconditions.checkNotNull(componentInfo); + Preconditions.checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array."); + + return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo); + } + + /** + * Creates a new {@link org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo} from a + * {@link TypeInformation} for the component type. + * + * <p> + * This must be used in cases where the complete type of the array is not available as a + * {@link java.lang.reflect.Type} or {@link java.lang.Class}. + */ + @SuppressWarnings("unchecked") + public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(TypeInformation<C> componentInfo) { + Preconditions.checkNotNull(componentInfo); + + return new ObjectArrayTypeInfo<T, C>( + (Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(), + componentInfo); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java new file mode 100644 index 0000000..1b008c0 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.util.Objects; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * Represent a field definition for {@link PojoTypeInfo} type of objects. + */ +public class PojoField implements Serializable { + + private static final long serialVersionUID = 1975295846436559363L; + + private transient Field field; + private final TypeInformation<?> type; + + public PojoField(Field field, TypeInformation<?> type) { + this.field = Preconditions.checkNotNull(field); + this.type = Preconditions.checkNotNull(type); + } + + public Field getField() { + return field; + } + + public TypeInformation<?> getTypeInformation() { + return type; + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + out.writeObject(field.getDeclaringClass()); + out.writeUTF(field.getName()); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + Class<?> clazz = (Class<?>)in.readObject(); + String fieldName = in.readUTF(); + field = null; + // try superclasses as well + while (clazz != null) { + try { + field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + break; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + if (field == null) { + throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup." + + " (" + fieldName + ")"); + } + } + + @Override + public String toString() { + return "PojoField " + field.getDeclaringClass() + "." + field.getName() + " (" + type + ")"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof PojoField) { + PojoField other = (PojoField) obj; + + return other.canEqual(this) && type.equals(other.type) && + Objects.equals(field, other.field); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(field, type); + } + + public boolean canEqual(Object obj) { + return obj instanceof PojoField; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java new file mode 100644 index 0000000..cc0d239 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.base.Preconditions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys.ExpressionKeys; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.AvroSerializer; +import org.apache.flink.api.java.typeutils.runtime.PojoComparator; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; + +import com.google.common.base.Joiner; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; + +/** + * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs, + * since the conditions are slightly different from Java Beans. + * A type is considered a FLink POJO type, if it fulfills the conditions below. + * <ul> + * <li>It is a public class, and standalone (not a non-static inner class)</li> + * <li>It has a public no-argument constructor.</li> + * <li>All fields are either public, or have public getters and setters.</li> + * </ul> + * + * @param <T> The type represented by this type information. + */ +public class PojoTypeInfo<T> extends CompositeType<T> { + + private static final long serialVersionUID = 1L; + + private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"; + private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; + private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR + +"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA; + + private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS); + private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); + + private final PojoField[] fields; + + private final int totalFields; + + public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) { + super(typeClass); + + Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()), + "POJO " + typeClass + " is not public"); + + this.fields = fields.toArray(new PojoField[fields.size()]); + + Arrays.sort(this.fields, new Comparator<PojoField>() { + @Override + public int compare(PojoField o1, PojoField o2) { + return o1.getField().getName().compareTo(o2.getField().getName()); + } + }); + + int counterFields = 0; + + for(PojoField field : fields) { + counterFields += field.getTypeInformation().getTotalFields(); + } + + totalFields = counterFields; + } + + @Override + public boolean isBasicType() { + return false; + } + + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return fields.length; + } + + @Override + public int getTotalFields() { + return totalFields; + } + + @Override + public boolean isSortKeyType() { + // Support for sorting POJOs that implement Comparable is not implemented yet. + // Since the order of fields in a POJO type is not well defined, sorting on fields + // gives only some undefined order. + return false; + } + + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + if(!matcher.matches()) { + throw new InvalidFieldReferenceException("Invalid POJO field reference \""+fieldExpression+"\"."); + } + + String field = matcher.group(0); + if(field.equals(ExpressionKeys.SELECT_ALL_CHAR) || field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + // handle select all + int keyPosition = 0; + for(PojoField pField : fields) { + if(pField.getTypeInformation() instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>)pField.getTypeInformation(); + cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result); + keyPosition += cType.getTotalFields()-1; + } else { + result.add( + new NamedFlatFieldDescriptor( + pField.getField().getName(), + offset + keyPosition, + pField.getTypeInformation())); + } + keyPosition++; + } + return; + } else { + field = matcher.group(1); + } + + // get field + int fieldPos = -1; + TypeInformation<?> fieldType = null; + for (int i = 0; i < fields.length; i++) { + if (fields[i].getField().getName().equals(field)) { + fieldPos = i; + fieldType = fields[i].getTypeInformation(); + break; + } + } + if (fieldPos == -1) { + throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+"."); + } + String tail = matcher.group(3); + if(tail == null) { + if(fieldType instanceof CompositeType) { + // forward offset + for(int i=0; i<fieldPos; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + // add all fields of composite type + ((CompositeType<?>) fieldType).getFlatFields("*", offset, result); + } else { + // we found the field to add + // compute flat field position by adding skipped fields + int flatFieldPos = offset; + for(int i=0; i<fieldPos; i++) { + flatFieldPos += this.getTypeAt(i).getTotalFields(); + } + result.add(new FlatFieldDescriptor(flatFieldPos, fieldType)); + } + } else { + if(fieldType instanceof CompositeType<?>) { + // forward offset + for(int i=0; i<fieldPos; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + ((CompositeType<?>) fieldType).getFlatFields(tail, offset, result); + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { + + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); + if(!matcher.matches()) { + if (fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) || fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + } else { + throw new InvalidFieldReferenceException("Invalid format of POJO field expression \""+fieldExpression+"\"."); + } + } + + String field = matcher.group(1); + // get field + int fieldPos = -1; + TypeInformation<?> fieldType = null; + for (int i = 0; i < fields.length; i++) { + if (fields[i].getField().getName().equals(field)) { + fieldPos = i; + fieldType = fields[i].getTypeInformation(); + break; + } + } + if (fieldPos == -1) { + throw new InvalidFieldReferenceException("Unable to find field \""+field+"\" in type "+this+"."); + } + + String tail = matcher.group(3); + if(tail == null) { + // we found the type + return (TypeInformation<X>) fieldType; + } else { + if(fieldType instanceof CompositeType<?>) { + return ((CompositeType<?>) fieldType).getTypeAt(tail); + } else { + throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+"."); + } + } + } + + @Override + public <X> TypeInformation<X> getTypeAt(int pos) { + if (pos < 0 || pos >= this.fields.length) { + throw new IndexOutOfBoundsException(); + } + @SuppressWarnings("unchecked") + TypeInformation<X> typed = (TypeInformation<X>) fields[pos].getTypeInformation(); + return typed; + } + + @Override + protected TypeComparatorBuilder<T> createTypeComparatorBuilder() { + return new PojoTypeComparatorBuilder(); + } + + // used for testing. Maybe use mockito here + public PojoField getPojoFieldAt(int pos) { + if (pos < 0 || pos >= this.fields.length) { + throw new IndexOutOfBoundsException(); + } + return this.fields[pos]; + } + + public String[] getFieldNames() { + String[] result = new String[fields.length]; + for (int i = 0; i < fields.length; i++) { + result[i] = fields[i].getField().getName(); + } + return result; + } + + @Override + public int getFieldIndex(String fieldName) { + for (int i = 0; i < fields.length; i++) { + if (fields[i].getField().getName().equals(fieldName)) { + return i; + } + } + return -1; + } + + @Override + public TypeSerializer<T> createSerializer(ExecutionConfig config) { + if(config.isForceKryoEnabled()) { + return new KryoSerializer<T>(getTypeClass(), config); + } + if(config.isForceAvroEnabled()) { + return new AvroSerializer<T>(getTypeClass()); + } + + TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ]; + Field[] reflectiveFields = new Field[fields.length]; + + for (int i = 0; i < fields.length; i++) { + fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config); + reflectiveFields[i] = fields[i].getField(); + } + + return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof PojoTypeInfo) { + @SuppressWarnings("unchecked") + PojoTypeInfo<T> pojoTypeInfo = (PojoTypeInfo<T>)obj; + + return pojoTypeInfo.canEqual(this) && + super.equals(pojoTypeInfo) && + Arrays.equals(fields, pojoTypeInfo.fields) && + totalFields == pojoTypeInfo.totalFields; + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * (31 * Arrays.hashCode(fields) + totalFields) + super.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof PojoTypeInfo; + } + + @Override + public String toString() { + List<String> fieldStrings = new ArrayList<String>(); + for (PojoField field : fields) { + fieldStrings.add(field.getField().getName() + ": " + field.getTypeInformation().toString()); + } + return "PojoType<" + getTypeClass().getName() + + ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]" + + ">"; + } + + // -------------------------------------------------------------------------------------------- + + private class PojoTypeComparatorBuilder implements TypeComparatorBuilder<T> { + + private ArrayList<TypeComparator> fieldComparators; + private ArrayList<Field> keyFields; + + public PojoTypeComparatorBuilder() { + fieldComparators = new ArrayList<TypeComparator>(); + keyFields = new ArrayList<Field>(); + } + + + @Override + public void initializeTypeComparatorBuilder(int size) { + fieldComparators.ensureCapacity(size); + keyFields.ensureCapacity(size); + } + + @Override + public void addComparatorField(int fieldId, TypeComparator<?> comparator) { + fieldComparators.add(comparator); + keyFields.add(fields[fieldId].getField()); + } + + @Override + public TypeComparator<T> createTypeComparator(ExecutionConfig config) { + Preconditions.checkState( + keyFields.size() > 0, + "No keys were defined for the PojoTypeComparatorBuilder."); + + Preconditions.checkState( + fieldComparators.size() > 0, + "No type comparators were defined for the PojoTypeComparatorBuilder."); + + Preconditions.checkState( + keyFields.size() == fieldComparators.size(), + "Number of key fields and field comparators is not equal."); + + return new PojoComparator<T>( + keyFields.toArray(new Field[keyFields.size()]), + fieldComparators.toArray(new TypeComparator[fieldComparators.size()]), + createSerializer(config), + getTypeClass()); + } + } + + public static class NamedFlatFieldDescriptor extends FlatFieldDescriptor { + + private String fieldName; + + public NamedFlatFieldDescriptor(String name, int keyPosition, TypeInformation<?> type) { + super(keyPosition, type); + this.fieldName = name; + } + + public String getFieldName() { + return fieldName; + } + + @Override + public String toString() { + return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java new file mode 100644 index 0000000..5e0cbed --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * This interface can be implemented by functions and input formats to tell the framework + * about their produced data type. This method acts as an alternative to the reflection analysis + * that is otherwise performed and is useful in situations where the produced data type may vary + * depending on parametrization. + */ +public interface ResultTypeQueryable<T> { + + /** + * Gets the data type (as a {@link TypeInformation}) produced by this function or input format. + * + * @return The data type produced by this function or input format. + */ + TypeInformation<T> getProducedType(); +}