http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java deleted file mode 100644 index 276ffc4..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.KryoException; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.base.Preconditions; - -import org.apache.avro.generic.GenericData; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; -import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; -import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.objenesis.strategy.StdInstantiatorStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Objects; - -/** - * A type serializer that serializes its type using the Kryo serialization - * framework (https://github.com/EsotericSoftware/kryo). - * - * This serializer is intended as a fallback serializer for the cases that are - * not covered by the basic types, tuples, and POJOs. - * - * @param <T> The type to be serialized. - */ -public class KryoSerializer<T> extends TypeSerializer<T> { - - private static final long serialVersionUID = 3L; - - private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class); - - // ------------------------------------------------------------------------ - - private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers; - private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses; - private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers; - private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses; - private final LinkedHashSet<Class<?>> registeredTypes; - - private final Class<T> type; - - // ------------------------------------------------------------------------ - // The fields below are lazily initialized after duplication or deserialization. - - private transient Kryo kryo; - private transient T copyInstance; - - private transient DataOutputView previousOut; - private transient DataInputView previousIn; - - private transient Input input; - private transient Output output; - - // ------------------------------------------------------------------------ - - public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ - this.type = Preconditions.checkNotNull(type); - - this.defaultSerializers = executionConfig.getDefaultKryoSerializers(); - this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses(); - this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers(); - this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses(); - this.registeredTypes = executionConfig.getRegisteredKryoTypes(); - } - - /** - * Copy-constructor that does not copy transient fields. They will be initialized once required. - */ - protected KryoSerializer(KryoSerializer<T> toCopy) { - registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers; - registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses; - defaultSerializers = toCopy.defaultSerializers; - defaultSerializerClasses = toCopy.defaultSerializerClasses; - registeredTypes = toCopy.registeredTypes; - - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); - } - } - - // ------------------------------------------------------------------------ - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public KryoSerializer<T> duplicate() { - return new KryoSerializer<T>(this); - } - - @Override - public T createInstance() { - if(Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers()) ) { - return null; - } else { - checkKryoInitialized(); - try { - return kryo.newInstance(type); - } catch(Throwable e) { - return null; - } - } - } - - @SuppressWarnings("unchecked") - @Override - public T copy(T from) { - if (from == null) { - return null; - } - checkKryoInitialized(); - try { - return kryo.copy(from); - } - catch(KryoException ke) { - // kryo was unable to copy it, so we do it through serialization: - ByteArrayOutputStream baout = new ByteArrayOutputStream(); - Output output = new Output(baout); - - kryo.writeObject(output, from); - - output.close(); - - ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); - Input input = new Input(bain); - - return (T)kryo.readObject(input, from.getClass()); - } - } - - @Override - public T copy(T from, T reuse) { - return copy(from); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(T record, DataOutputView target) throws IOException { - checkKryoInitialized(); - if (target != previousOut) { - DataOutputViewStream outputStream = new DataOutputViewStream(target); - output = new Output(outputStream); - previousOut = target; - } - - // Sanity check: Make sure that the output is cleared/has been flushed by the last call - // otherwise data might be written multiple times in case of a previous EOFException - if (output.position() != 0) { - throw new IllegalStateException("The Kryo Output still contains data from a previous " + - "serialize call. It has to be flushed or cleared at the end of the serialize call."); - } - - try { - kryo.writeClassAndObject(output, record); - output.flush(); - } - catch (KryoException ke) { - // make sure that the Kryo output buffer is cleared in case that we can recover from - // the exception (e.g. EOFException which denotes buffer full) - output.clear(); - - Throwable cause = ke.getCause(); - if (cause instanceof EOFException) { - throw (EOFException) cause; - } - else { - throw ke; - } - } - } - - @SuppressWarnings("unchecked") - @Override - public T deserialize(DataInputView source) throws IOException { - checkKryoInitialized(); - if (source != previousIn) { - DataInputViewStream inputStream = new DataInputViewStream(source); - input = new NoFetchingInput(inputStream); - previousIn = source; - } - - try { - return (T) kryo.readClassAndObject(input); - } catch (KryoException ke) { - Throwable cause = ke.getCause(); - - if (cause instanceof EOFException) { - throw (EOFException) cause; - } else { - throw ke; - } - } - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - checkKryoInitialized(); - if(this.copyInstance == null){ - this.copyInstance = createInstance(); - } - - T tmp = deserialize(copyInstance, source); - serialize(tmp, target); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return Objects.hash( - type, - registeredTypes, - registeredTypesWithSerializerClasses, - defaultSerializerClasses); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof KryoSerializer) { - KryoSerializer<?> other = (KryoSerializer<?>) obj; - - // we cannot include the Serializers here because they don't implement the equals method - return other.canEqual(this) && - type == other.type && - registeredTypes.equals(other.registeredTypes) && - registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) && - defaultSerializerClasses.equals(other.defaultSerializerClasses); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof KryoSerializer; - } - - // -------------------------------------------------------------------------------------------- - - /** - * Returns the Chill Kryo Serializer which is implictly added to the classpath via flink-runtime. - * Falls back to the default Kryo serializer if it can't be found. - * @return The Kryo serializer instance. - */ - private Kryo getKryoInstance() { - - try { - // check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill library). - // This will be true if Flink's Scala API is used. - Class<?> chillInstantiatorClazz = Class.forName("com.twitter.chill.ScalaKryoInstantiator"); - Object chillInstantiator = chillInstantiatorClazz.newInstance(); - - // obtain a Kryo instance through Twitter Chill - Method m = chillInstantiatorClazz.getMethod("newKryo"); - - return (Kryo) m.invoke(chillInstantiator); - } catch (ClassNotFoundException | InstantiationException | NoSuchMethodException | - IllegalAccessException | InvocationTargetException e) { - - LOG.warn("Falling back to default Kryo serializer because Chill serializer couldn't be found.", e); - - Kryo.DefaultInstantiatorStrategy initStrategy = new Kryo.DefaultInstantiatorStrategy(); - initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - - Kryo kryo = new Kryo(); - kryo.setInstantiatorStrategy(initStrategy); - - return kryo; - } - } - - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = getKryoInstance(); - - // disable reference tracking. reference tracking is costly, usually unnecessary, and - // inconsistent with Flink's own serialization (which does not do reference tracking) - kryo.setReferences(false); - - // Throwable and all subclasses should be serialized via java serialization - kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); - - // Add default serializers first, so that they type registrations without a serializer - // are registered with a default serializer - for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) { - kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer()); - } - - for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: defaultSerializerClasses.entrySet()) { - kryo.addDefaultSerializer(entry.getKey(), entry.getValue()); - } - - // register the type of our class - kryo.register(type); - - // register given types. we do this first so that any registration of a - // more specific serializer overrides this - for (Class<?> type : registeredTypes) { - kryo.register(type); - } - - // register given serializer classes - for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) { - Class<?> typeClass = e.getKey(); - Class<? extends Serializer<?>> serializerClass = e.getValue(); - - Serializer<?> serializer = - ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass); - kryo.register(typeClass, serializer); - } - - // register given serializers - for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) { - kryo.register(e.getKey(), e.getValue().getSerializer()); - } - // this is needed for Avro but can not be added on demand. - kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList()); - - kryo.setRegistrationRequired(false); - kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); - } - } - - // -------------------------------------------------------------------------------------------- - // For testing - // -------------------------------------------------------------------------------------------- - - public Kryo getKryo() { - checkKryoInitialized(); - return this.kryo; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java deleted file mode 100644 index 8bac729..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.CollectionSerializer; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.Utils; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -import java.io.Serializable; -import java.lang.reflect.Field; -import java.lang.reflect.GenericArrayType; -import java.lang.reflect.Modifier; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; - - -/** - * Class containing utilities for the serializers of the Flink Runtime. - * - * Most of the serializers are automatically added to the system. - * - * Note that users can also implement the {@link com.esotericsoftware.kryo.KryoSerializable} interface - * to provide custom serialization for their classes. - * Also, there is a Java Annotation for adding a default serializer (@DefaultSerializer) to classes. - */ -public class Serializers { - - public static void recursivelyRegisterType(TypeInformation<?> typeInfo, ExecutionConfig config, Set<Class<?>> alreadySeen) { - if (typeInfo instanceof GenericTypeInfo) { - GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo; - Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config, alreadySeen); - } - else if (typeInfo instanceof CompositeType) { - List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<>(); - Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite); - for (GenericTypeInfo<?> gt : genericTypesInComposite) { - Serializers.recursivelyRegisterType(gt.getTypeClass(), config, alreadySeen); - } - } - else if (typeInfo instanceof ObjectArrayTypeInfo) { - ObjectArrayTypeInfo<?, ?> objectArrayTypeInfo = (ObjectArrayTypeInfo<?, ?>) typeInfo; - recursivelyRegisterType(objectArrayTypeInfo.getComponentInfo(), config, alreadySeen); - } - } - - public static void recursivelyRegisterType(Class<?> type, ExecutionConfig config, Set<Class<?>> alreadySeen) { - // don't register or remember primitives - if (type == null || type.isPrimitive() || type == Object.class) { - return; - } - - // prevent infinite recursion for recursive types - if (!alreadySeen.add(type)) { - return; - } - - if (type.isArray()) { - recursivelyRegisterType(type.getComponentType(), config, alreadySeen); - } - else { - config.registerKryoType(type); - checkAndAddSerializerForTypeAvro(config, type); - - Field[] fields = type.getDeclaredFields(); - for (Field field : fields) { - if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) { - continue; - } - Type fieldType = field.getGenericType(); - recursivelyRegisterGenericType(fieldType, config, alreadySeen); - } - } - } - - private static void recursivelyRegisterGenericType(Type fieldType, ExecutionConfig config, Set<Class<?>> alreadySeen) { - if (fieldType instanceof ParameterizedType) { - // field has generics - ParameterizedType parameterizedFieldType = (ParameterizedType) fieldType; - - for (Type t: parameterizedFieldType.getActualTypeArguments()) { - if (TypeExtractor.isClassType(t) ) { - recursivelyRegisterType(TypeExtractor.typeToClass(t), config, alreadySeen); - } - } - - recursivelyRegisterGenericType(parameterizedFieldType.getRawType(), config, alreadySeen); - } - else if (fieldType instanceof GenericArrayType) { - GenericArrayType genericArrayType = (GenericArrayType) fieldType; - recursivelyRegisterGenericType(genericArrayType.getGenericComponentType(), config, alreadySeen); - } - else if (fieldType instanceof Class) { - Class<?> clazz = (Class<?>) fieldType; - recursivelyRegisterType(clazz, config, alreadySeen); - } - } - - private static void checkAndAddSerializerForTypeAvro(ExecutionConfig reg, Class<?> type) { - if (GenericData.Record.class.isAssignableFrom(type)) { - registerGenericAvro(reg); - } - if (SpecificRecordBase.class.isAssignableFrom(type)) { - @SuppressWarnings("unchecked") - Class<? extends SpecificRecordBase> specRecordClass = (Class<? extends SpecificRecordBase>) type; - registerSpecificAvro(reg, specRecordClass); - } - } - - /** - * Register these serializers for using Avro's {@link GenericData.Record} and classes - * implementing {@link org.apache.avro.specific.SpecificRecordBase} - */ - private static void registerGenericAvro(ExecutionConfig reg) { - // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type - // because Kryo is not able to serialize them properly, we use this serializer for them - reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class); - - // We register this serializer for users who want to use untyped Avro records (GenericData.Record). - // Kryo is able to serialize everything in there, except for the Schema. - // This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea. - // we add the serializer as a default serializer because Avro is using a private sub-type at runtime. - reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class); - } - - private static void registerSpecificAvro(ExecutionConfig reg, Class<? extends SpecificRecordBase> avroType) { - registerGenericAvro(reg); - // This rule only applies if users explicitly use the GenericTypeInformation for the avro types - // usually, we are able to handle Avro POJOs with the POJO serializer. - // (However only if the GenericData.Array type is registered!) - - // ClassTag<SpecificRecordBase> tag = scala.reflect.ClassTag$.MODULE$.apply(avroType); - // reg.registerTypeWithKryoSerializer(avroType, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag)); - } - - // -------------------------------------------------------------------------------------------- - // Custom Serializers - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("rawtypes") - public static class SpecificInstanceCollectionSerializerForArrayList extends SpecificInstanceCollectionSerializer<ArrayList> { - private static final long serialVersionUID = 1L; - - public SpecificInstanceCollectionSerializerForArrayList() { - super(ArrayList.class); - } - } - /** - * Special serializer for Java collections enforcing certain instance types. - * Avro is serializing collections with an "GenericData.Array" type. Kryo is not able to handle - * this type, so we use ArrayLists. - */ - @SuppressWarnings("rawtypes") - public static class SpecificInstanceCollectionSerializer<T extends Collection> - extends CollectionSerializer implements Serializable - { - private static final long serialVersionUID = 1L; - - private Class<T> type; - - public SpecificInstanceCollectionSerializer(Class<T> type) { - this.type = type; - } - - @Override - protected Collection create(Kryo kryo, Input input, Class<Collection> type) { - return kryo.newInstance(this.type); - } - - @Override - protected Collection createCopy(Kryo kryo, Collection original) { - return kryo.newInstance(this.type); - } - } - - /** - * Slow serialization approach for Avro schemas. - * This is only used with {{@link org.apache.avro.generic.GenericData.Record}} types. - * Having this serializer, we are able to handle avro Records. - */ - public static class AvroSchemaSerializer extends Serializer<Schema> implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void write(Kryo kryo, Output output, Schema object) { - String schemaAsString = object.toString(false); - output.writeString(schemaAsString); - } - - @Override - public Schema read(Kryo kryo, Input input, Class<Schema> type) { - String schemaAsString = input.readString(); - // the parser seems to be stateful, to we need a new one for every type. - Schema.Parser sParser = new Schema.Parser(); - return sParser.parse(schemaAsString); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java new file mode 100644 index 0000000..fb20d78 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeInfoParser; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests concerning type extraction by ExecutionEnvironment methods. + */ +@SuppressWarnings("serial") +public class TypeExtractionTest { + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testFunctionWithMissingGenericsAndReturns() { + + RichMapFunction function = new RichMapFunction() { + private static final long serialVersionUID = 1L; + + @Override + public Object map(Object value) throws Exception { + return null; + } + }; + + TypeInformation<?> info = ExecutionEnvironment.getExecutionEnvironment() + .fromElements("arbitrary", "data") + .map(function).returns("String").getResultType(); + + assertEquals(TypeInfoParser.parse("String"), info); + } + + @Test + public void testGetterSetterWithVertex() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0)); + } + + // ------------------------------------------------------------------------ + // Test types + // ------------------------------------------------------------------------ + + public static class Vertex<K, V> { + + private K key1; + private K key2; + private V value; + + public Vertex() {} + + public Vertex(K key, V value) { + this.key1 = key; + this.key2 = key; + this.value = value; + } + + public Vertex(K key1, K key2, V value) { + this.key1 = key1; + this.key2 = key2; + this.value = value; + } + + public void setKey1(K key1) { + this.key1 = key1; + } + + public void setKey2(K key2) { + this.key2 = key2; + } + + public K getKey1() { + return key1; + } + + public K getKey2() { + return key2; + } + + public void setValue(V value) { + this.value = value; + } + + public V getValue() { + return value; + } + } + + public static class VertexTyped extends Vertex<Long, Double>{ + public VertexTyped(Long l, Double d) { + super(l, d); + } + public VertexTyped() { + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/operators/ExpressionKeysTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/ExpressionKeysTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/ExpressionKeysTest.java deleted file mode 100644 index dd82234..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/ExpressionKeysTest.java +++ /dev/null @@ -1,479 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.operators; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Arrays; - -import org.apache.commons.lang3.ArrayUtils; -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.operators.Keys.ExpressionKeys; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.operators.SelectorFunctionKeysTest.KeySelector1; -import org.apache.flink.api.java.operators.SelectorFunctionKeysTest.KeySelector3; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; - -@SuppressWarnings("unused") -@RunWith(PowerMockRunner.class) -public class ExpressionKeysTest { - - @Test - public void testBasicType() { - - TypeInformation<Long> longType = BasicTypeInfo.LONG_TYPE_INFO; - ExpressionKeys<Long> ek = new ExpressionKeys<>("*", longType); - - Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions()); - } - - @Test(expected = InvalidProgramException.class) - public void testGenericNonKeyType() { - // Fail: GenericType cannot be used as key - TypeInformation<GenericNonKeyType> genericType = new GenericTypeInfo<>(GenericNonKeyType.class); - new ExpressionKeys<>("*", genericType); - } - - @Test - public void testKeyGenericType() { - - TypeInformation<GenericKeyType> genericType = new GenericTypeInfo<>(GenericKeyType.class); - ExpressionKeys<GenericKeyType> ek = new ExpressionKeys<>("*", genericType); - - Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions()); - } - - @Test - public void testTupleRangeCheck() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { - - // test private static final int[] rangeCheckFields(int[] fields, int maxAllowedField) - Method rangeCheckFieldsMethod = Whitebox.getMethod(Keys.class, "rangeCheckFields", int[].class, int.class); - - // valid indexes - rangeCheckFieldsMethod.invoke(null, new int[]{1, 2, 3, 4}, 4); - - // corner case tests - rangeCheckFieldsMethod.invoke(null, new int[] {0}, 0); - - Throwable ex = null; - try { - // throws illegal argument. - rangeCheckFieldsMethod.invoke(null, new int[] {5}, 0); - } catch(Throwable iae) { - ex = iae; - } - Assert.assertNotNull(ex); - } - - @Test - public void testStandardTupleKeys() { - TupleTypeInfo<Tuple7<String, String, String, String, String, String, String>> typeInfo = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO); - - ExpressionKeys<Tuple7<String, String, String, String, String, String, String>> ek; - - for( int i = 1; i < 8; i++) { - int[] ints = new int[i]; - for( int j = 0; j < i; j++) { - ints[j] = j; - } - int[] inInts = Arrays.copyOf(ints, ints.length); // copy, just to make sure that the code is not cheating by changing the ints. - ek = new ExpressionKeys<>(inInts, typeInfo); - Assert.assertArrayEquals(ints, ek.computeLogicalKeyPositions()); - Assert.assertEquals(ints.length, ek.computeLogicalKeyPositions().length); - - ArrayUtils.reverse(ints); - inInts = Arrays.copyOf(ints, ints.length); - ek = new ExpressionKeys<>(inInts, typeInfo); - Assert.assertArrayEquals(ints, ek.computeLogicalKeyPositions()); - Assert.assertEquals(ints.length, ek.computeLogicalKeyPositions().length); - } - } - - @Test - public void testInvalidTuple() throws Throwable { - TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, String>> typeInfo = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - new TupleTypeInfo<Tuple3<String, String, String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO), - BasicTypeInfo.STRING_TYPE_INFO); - - String[][] tests = new String[][] { - new String[] {"f0.f1"}, // nesting into unnested - new String[] {"f11"}, - new String[] {"f-35"}, - new String[] {"f0.f33"}, - new String[] {"f1.f33"} - }; - for (String[] test : tests) { - Throwable e = null; - try { - new ExpressionKeys<>(test, typeInfo); - } catch (Throwable t) { - e = t; - } - Assert.assertNotNull(e); - } - } - - @Test(expected = InvalidProgramException.class) - public void testTupleNonKeyField() { - // selected field is not a key type - TypeInformation<Tuple3<String, Long, GenericNonKeyType>> ti = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - TypeExtractor.getForClass(GenericNonKeyType.class) - ); - - new ExpressionKeys<>(2, ti); - } - - @Test - public void testTupleKeyExpansion() { - TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, String>> typeInfo = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - new TupleTypeInfo<Tuple3<String, String, String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO), - BasicTypeInfo.STRING_TYPE_INFO); - ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>> fpk = - new ExpressionKeys<>(0, typeInfo); - Assert.assertArrayEquals(new int[] {0}, fpk.computeLogicalKeyPositions()); - - fpk = new ExpressionKeys<>(1, typeInfo); - Assert.assertArrayEquals(new int[] {1,2,3}, fpk.computeLogicalKeyPositions()); - - fpk = new ExpressionKeys<>(2, typeInfo); - Assert.assertArrayEquals(new int[] {4}, fpk.computeLogicalKeyPositions()); - - fpk = new ExpressionKeys<>(new int[] {0,1,2}, typeInfo); - Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions()); - - fpk = new ExpressionKeys<>(null, typeInfo, true); // empty case - Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions()); - - fpk = new ExpressionKeys<>("*", typeInfo); - Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions()); - - // scala style "select all" - fpk = new ExpressionKeys<>("_", typeInfo); - Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions()); - - // this was a bug: - fpk = new ExpressionKeys<>("f2", typeInfo); - Assert.assertArrayEquals(new int[] {4}, fpk.computeLogicalKeyPositions()); - - fpk = new ExpressionKeys<>(new String[] {"f0","f1.f0","f1.f1", "f1.f2", "f2"}, typeInfo); - Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions()); - - fpk = new ExpressionKeys<>(new String[] {"f0","f1.f0","f1.f1", "f2"}, typeInfo); - Assert.assertArrayEquals(new int[] {0,1,2,4}, fpk.computeLogicalKeyPositions()); - - fpk = new ExpressionKeys<>(new String[] {"f2", "f0"}, typeInfo); - Assert.assertArrayEquals(new int[] {4,0}, fpk.computeLogicalKeyPositions()); - - - TupleTypeInfo<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>> complexTypeInfo = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - new TupleTypeInfo<Tuple3<Tuple3<String, String, String>, String, String>>(new TupleTypeInfo<Tuple3<String, String, String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO), - BasicTypeInfo.STRING_TYPE_INFO); - - ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>> complexFpk = - new ExpressionKeys<>(0, complexTypeInfo); - Assert.assertArrayEquals(new int[] {0}, complexFpk.computeLogicalKeyPositions()); - - complexFpk = new ExpressionKeys<>(new int[] {0,1,2}, complexTypeInfo); - Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, complexFpk.computeLogicalKeyPositions()); - - complexFpk = new ExpressionKeys<>("*", complexTypeInfo); - Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, complexFpk.computeLogicalKeyPositions()); - - // scala style select all - complexFpk = new ExpressionKeys<>("_", complexTypeInfo); - Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, complexFpk.computeLogicalKeyPositions()); - - complexFpk = new ExpressionKeys<>("f1.f0.*", complexTypeInfo); - Assert.assertArrayEquals(new int[] {1,2,3}, complexFpk.computeLogicalKeyPositions()); - - complexFpk = new ExpressionKeys<>("f1.f0", complexTypeInfo); - Assert.assertArrayEquals(new int[] {1,2,3}, complexFpk.computeLogicalKeyPositions()); - - complexFpk = new ExpressionKeys<>("f2", complexTypeInfo); - Assert.assertArrayEquals(new int[] {6}, complexFpk.computeLogicalKeyPositions()); - } - - @Test - public void testPojoKeys() { - TypeInformation<PojoWithMultiplePojos> ti = TypeExtractor.getForClass(PojoWithMultiplePojos.class); - ExpressionKeys<PojoWithMultiplePojos> ek; - ek = new ExpressionKeys<>("*", ti); - Assert.assertArrayEquals(new int[] {0,1,2,3,4}, ek.computeLogicalKeyPositions()); - - ek = new ExpressionKeys<>("p1.*", ti); - Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions()); - - ek = new ExpressionKeys<>("p2.*", ti); - Assert.assertArrayEquals(new int[] {3,4}, ek.computeLogicalKeyPositions()); - - ek = new ExpressionKeys<>("p1", ti); - Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions()); - - ek = new ExpressionKeys<>("p2", ti); - Assert.assertArrayEquals(new int[] {3,4}, ek.computeLogicalKeyPositions()); - - ek = new ExpressionKeys<>("i0", ti); - Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions()); - } - - @Test - public void testTupleWithNestedPojo() { - - TypeInformation<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ti = - new TupleTypeInfo<>( - BasicTypeInfo.INT_TYPE_INFO, - TypeExtractor.getForClass(Pojo1.class), - TypeExtractor.getForClass(PojoWithMultiplePojos.class) - ); - - ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ek; - - ek = new ExpressionKeys<>(0, ti); - Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions()); - - ek = new ExpressionKeys<>(1, ti); - Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions()); - - ek = new ExpressionKeys<>(2, ti); - Assert.assertArrayEquals(new int[] {3,4,5,6,7}, ek.computeLogicalKeyPositions()); - - ek = new ExpressionKeys<>(new int[]{}, ti, true); - Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6,7}, ek.computeLogicalKeyPositions()); - - ek = new ExpressionKeys<>("*", ti); - Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6,7}, ek.computeLogicalKeyPositions()); - - } - - @Test(expected = InvalidProgramException.class) - public void testNonKeyPojoField() { - // selected field is not a key type - TypeInformation<PojoWithNonKeyField> ti = TypeExtractor.getForClass(PojoWithNonKeyField.class); - new ExpressionKeys<>("b", ti); - } - - @Test - public void testInvalidPojo() throws Throwable { - TypeInformation<ComplexNestedClass> ti = TypeExtractor.getForClass(ComplexNestedClass.class); - - String[][] tests = new String[][] { - new String[] {"nonexistent"}, - new String[] {"date.abc"} // nesting into unnested - }; - for (String[] test : tests) { - Throwable e = null; - try { - new ExpressionKeys<>(test, ti); - } catch (Throwable t) { - e = t; - } - Assert.assertNotNull(e); - } - } - - @Test - public void testAreCompatible1() throws Keys.IncompatibleKeysException { - TypeInformation<Pojo1> t1 = TypeExtractor.getForClass(Pojo1.class); - - ExpressionKeys<Pojo1> ek1 = new ExpressionKeys<>("a", t1); - ExpressionKeys<Pojo1> ek2 = new ExpressionKeys<>("b", t1); - - Assert.assertTrue(ek1.areCompatible(ek2)); - Assert.assertTrue(ek2.areCompatible(ek1)); - } - - @Test - public void testAreCompatible2() throws Keys.IncompatibleKeysException { - TypeInformation<Pojo1> t1 = TypeExtractor.getForClass(Pojo1.class); - TypeInformation<Tuple2<String, Long>> t2 = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO - ); - - ExpressionKeys<Pojo1> ek1 = new ExpressionKeys<>("a", t1); - ExpressionKeys<Tuple2<String, Long>> ek2 = new ExpressionKeys<>(0, t2); - - Assert.assertTrue(ek1.areCompatible(ek2)); - Assert.assertTrue(ek2.areCompatible(ek1)); - } - - @Test - public void testAreCompatible3() throws Keys.IncompatibleKeysException { - TypeInformation<String> t1 = BasicTypeInfo.STRING_TYPE_INFO; - TypeInformation<Tuple2<String, Long>> t2 = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO - ); - - ExpressionKeys<String> ek1 = new ExpressionKeys<>("*", t1); - ExpressionKeys<Tuple2<String, Long>> ek2 = new ExpressionKeys<>(0, t2); - - Assert.assertTrue(ek1.areCompatible(ek2)); - Assert.assertTrue(ek2.areCompatible(ek1)); - } - - @Test - public void testAreCompatible4() throws Keys.IncompatibleKeysException { - TypeInformation<PojoWithMultiplePojos> t1 = TypeExtractor.getForClass(PojoWithMultiplePojos.class); - TypeInformation<Tuple3<String, Long, Integer>> t2 = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - ); - - ExpressionKeys<PojoWithMultiplePojos> ek1 = new ExpressionKeys<>(new String[]{"p1", "i0"}, t1); - ExpressionKeys<Tuple3<String, Long, Integer>> ek2 = new ExpressionKeys<>(new int[]{0, 0, 2}, t2); - - Assert.assertTrue(ek1.areCompatible(ek2)); - Assert.assertTrue(ek2.areCompatible(ek1)); - } - - @Test - public void testAreCompatible5() throws Keys.IncompatibleKeysException { - TypeInformation<PojoWithMultiplePojos> t1 = TypeExtractor.getForClass(PojoWithMultiplePojos.class); - TypeInformation<Tuple2<String, String>> t2 = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO - ); - - ExpressionKeys<PojoWithMultiplePojos> ek1 = new ExpressionKeys<>(new String[]{"p1.b", "p2.a2"}, t1); - ExpressionKeys<Tuple2<String, String>> ek2 = new ExpressionKeys<>("*", t2); - - Assert.assertTrue(ek1.areCompatible(ek2)); - Assert.assertTrue(ek2.areCompatible(ek1)); - } - - @Test(expected = Keys.IncompatibleKeysException.class) - public void testAreCompatible6() throws Keys.IncompatibleKeysException { - TypeInformation<Pojo1> t1 = TypeExtractor.getForClass(Pojo1.class); - TypeInformation<Tuple2<String, Long>> t2 = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO - ); - - ExpressionKeys<Pojo1> ek1 = new ExpressionKeys<>("a", t1); - ExpressionKeys<Tuple2<String, Long>> ek2 = new ExpressionKeys<>(1, t2); - - ek1.areCompatible(ek2); - } - - @Test(expected = Keys.IncompatibleKeysException.class) - public void testAreCompatible7() throws Keys.IncompatibleKeysException { - TypeInformation<Pojo1> t1 = TypeExtractor.getForClass(Pojo1.class); - TypeInformation<Tuple2<String, Long>> t2 = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO - ); - - ExpressionKeys<Pojo1> ek1 = new ExpressionKeys<>(new String[]{"a", "b"}, t1); - ExpressionKeys<Tuple2<String, Long>> ek2 = new ExpressionKeys<>(0, t2); - - ek1.areCompatible(ek2); - } - - @Test - public void testAreCompatible8() throws Keys.IncompatibleKeysException { - TypeInformation<String> t1 = BasicTypeInfo.STRING_TYPE_INFO; - TypeInformation<Pojo2> t2 = TypeExtractor.getForClass(Pojo2.class); - - ExpressionKeys<String> ek1 = new ExpressionKeys<>("*", t1); - Keys<Pojo2> ek2 = new Keys.SelectorFunctionKeys<>( - new KeySelector1(), - t2, - BasicTypeInfo.STRING_TYPE_INFO - ); - - Assert.assertTrue(ek1.areCompatible(ek2)); - } - - @Test - public void testAreCompatible9() throws Keys.IncompatibleKeysException { - TypeInformation<Tuple3<String, Long, Integer>> t1 = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - ); - TypeInformation<PojoWithMultiplePojos> t2 = TypeExtractor.getForClass(PojoWithMultiplePojos.class); - - ExpressionKeys<Tuple3<String, Long, Integer>> ek1 = new ExpressionKeys<>(new int[]{2,0}, t1); - Keys<PojoWithMultiplePojos> ek2 = new Keys.SelectorFunctionKeys<>( - new KeySelector3(), - t2, - new TupleTypeInfo<Tuple2<Integer, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) - ); - - Assert.assertTrue(ek1.areCompatible(ek2)); - } - - public static class Pojo1 { - public String a; - public String b; - } - - public static class Pojo2 { - public String a2; - public String b2; - } - - public static class PojoWithMultiplePojos { - public Pojo1 p1; - public Pojo2 p2; - public Integer i0; - } - - public static class PojoWithNonKeyField { - public String a; - public GenericNonKeyType b; - } - - public static class GenericNonKeyType { - private String a; - private String b; - } - - public static class GenericKeyType implements Comparable<GenericNonKeyType> { - private String a; - private String b; - - @Override - public int compareTo(GenericNonKeyType o) { - return 0; - } - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java index 5c2c7e9..09a705c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java @@ -48,7 +48,7 @@ public class NamesTest implements Serializable { public void testDefaultName() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<String> strs = env.fromCollection(Arrays.asList( new String[] {"a", "b"})); + DataSet<String> strs = env.fromCollection(Arrays.asList("a", "b")); // WARNING: The test will fail if this line is being moved down in the file (the line-number is hard-coded) @@ -68,7 +68,7 @@ public class NamesTest implements Serializable { public void testGivenName() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<String> strs = env.fromCollection(Arrays.asList( new String[] {"a", "b"})); + DataSet<String> strs = env.fromCollection(Arrays.asList("a", "b")); strs.filter(new FilterFunction<String>() { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/operators/SelectorFunctionKeysTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/SelectorFunctionKeysTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/SelectorFunctionKeysTest.java deleted file mode 100644 index f59af6e..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/SelectorFunctionKeysTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.operators; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.operators.ExpressionKeysTest.PojoWithMultiplePojos; -import org.apache.flink.api.java.operators.ExpressionKeysTest.Pojo1; -import org.apache.flink.api.java.operators.ExpressionKeysTest.Pojo2; -import org.junit.Assert; -import org.junit.Test; - -public class SelectorFunctionKeysTest { - - @Test - public void testAreCompatible1() throws Keys.IncompatibleKeysException { - TypeInformation<Pojo2> t1 = TypeExtractor.getForClass(Pojo2.class); - TypeInformation<Tuple2<Integer, String>> t2 = - new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); - - Keys<Pojo2> k1 = new Keys.SelectorFunctionKeys<>( - new KeySelector1(), - t1, - BasicTypeInfo.STRING_TYPE_INFO - ); - Keys<Tuple2<Integer, String>> k2 = new Keys.SelectorFunctionKeys<>( - new KeySelector2(), - t2, - BasicTypeInfo.STRING_TYPE_INFO - ); - - Assert.assertTrue(k1.areCompatible(k2)); - Assert.assertTrue(k2.areCompatible(k1)); - } - - @Test - public void testAreCompatible2() throws Keys.IncompatibleKeysException { - TypeInformation<PojoWithMultiplePojos> t1 = TypeExtractor.getForClass(PojoWithMultiplePojos.class); - TypeInformation<Tuple3<Long, Pojo1, Integer>> t2 = new TupleTypeInfo<>( - BasicTypeInfo.LONG_TYPE_INFO, - TypeExtractor.getForClass(Pojo1.class), - BasicTypeInfo.INT_TYPE_INFO); - TypeInformation<Tuple2<Integer, String>> kt = new TupleTypeInfo<>( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO - ); - - Keys<PojoWithMultiplePojos> k1 = new Keys.SelectorFunctionKeys<>( - new KeySelector3(), - t1, - kt - ); - Keys<Tuple3<Long, Pojo1, Integer>> k2 = new Keys.SelectorFunctionKeys<>( - new KeySelector4(), - t2, - kt - ); - - Assert.assertTrue(k1.areCompatible(k2)); - Assert.assertTrue(k2.areCompatible(k1)); - } - - @Test - public void testAreCompatible3() throws Keys.IncompatibleKeysException { - TypeInformation<String> t1 = BasicTypeInfo.STRING_TYPE_INFO; - TypeInformation<Pojo2> t2 = TypeExtractor.getForClass(Pojo2.class); - - Keys.ExpressionKeys<String> ek1 = new Keys.ExpressionKeys<>("*", t1); - Keys<Pojo2> sk2 = new Keys.SelectorFunctionKeys<>( - new KeySelector1(), - t2, - BasicTypeInfo.STRING_TYPE_INFO - ); - - Assert.assertTrue(sk2.areCompatible(ek1)); - } - - @Test - public void testAreCompatible4() throws Keys.IncompatibleKeysException { - TypeInformation<Tuple3<String, Long, Integer>> t1 = new TupleTypeInfo<>( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - ); - TypeInformation<PojoWithMultiplePojos> t2 = TypeExtractor.getForClass(PojoWithMultiplePojos.class); - - Keys.ExpressionKeys<Tuple3<String, Long, Integer>> ek1 = new Keys.ExpressionKeys<>(new int[]{2,0}, t1); - Keys<PojoWithMultiplePojos> sk2 = new Keys.SelectorFunctionKeys<>( - new KeySelector3(), - t2, - new TupleTypeInfo<Tuple2<Integer, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) - ); - - Assert.assertTrue(sk2.areCompatible(ek1)); - } - - @SuppressWarnings("serial") - public static class KeySelector1 implements KeySelector<Pojo2, String> { - - @Override - public String getKey(Pojo2 v) throws Exception { - return v.b2; - } - } - - @SuppressWarnings("serial") - public static class KeySelector2 implements KeySelector<Tuple2<Integer, String>, String> { - - @Override - public String getKey(Tuple2<Integer, String> v) throws Exception { - return v.f1; - } - } - - @SuppressWarnings("serial") - public static class KeySelector3 implements KeySelector<PojoWithMultiplePojos, Tuple2<Integer, String>> { - - @Override - public Tuple2<Integer, String> getKey(PojoWithMultiplePojos v) throws Exception { - return new Tuple2<>(v.i0, v.p1.b); - } - } - - @SuppressWarnings("serial") - public static class KeySelector4 implements KeySelector<Tuple3<Long, Pojo1, Integer>, Tuple2<Integer, String>> { - - @Override - public Tuple2<Integer, String> getKey(Tuple3<Long, Pojo1, Integer> v) throws Exception { - return new Tuple2<>(v.f2, v.f1.a); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java index 8d8f801..a1ccfe4 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java @@ -35,7 +35,7 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java deleted file mode 100644 index 0cc5dcf..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/tuple/Tuple2Test.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.tuple; - -import org.apache.flink.types.NullFieldException; -import org.junit.Assert; -import org.junit.Test; - -public class Tuple2Test { - - @Test - public void testSwapValues() { - Tuple2<String, Integer> toSwap = new Tuple2<>("Test case", 25); - Tuple2<Integer, String> swapped = toSwap.swap(); - - Assert.assertEquals(swapped.f0, toSwap.f1); - - Assert.assertEquals(swapped.f1, toSwap.f0); - } - - @Test(expected = NullFieldException.class) - public void testGetFieldNotNull() { - Tuple2<String, Integer> tuple = new Tuple2<>("Test case", null); - - Assert.assertEquals("Test case", tuple.getFieldNotNull(0)); - tuple.getFieldNotNull(1); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java deleted file mode 100644 index 96ba264..0000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java +++ /dev/null @@ -1,876 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.type.extractor; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.List; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.PojoField; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; -import org.junit.Assert; -import org.junit.Test; - -import com.google.common.collect.HashMultiset; - -/** - * Pojo Type tests - * - * A Pojo is a bean-style class with getters, setters and empty ctor - * OR a class with all fields public (or for every private field, there has to be a public getter/setter) - * everything else is a generic type (that can't be used for field selection) - */ -public class PojoTypeExtractionTest { - - public static class HasDuplicateField extends WC { - @SuppressWarnings("unused") - private int count; // duplicate - } - - @Test(expected=RuntimeException.class) - public void testDuplicateFieldException() { - TypeExtractor.createTypeInfo(HasDuplicateField.class); - } - - // test with correct pojo types - public static class WC { // is a pojo - public ComplexNestedClass complex; // is a pojo - private int count; // is a BasicType - - public WC() { - } - public int getCount() { - return count; - } - public void setCount(int c) { - this.count = c; - } - } - public static class ComplexNestedClass { // pojo - public static int ignoreStaticField; - public transient int ignoreTransientField; - public Date date; // generic type - public Integer someNumberWithÃnicödeNäme; // BasicType - public float someFloat; // BasicType - public Tuple3<Long, Long, String> word; //Tuple Type with three basic types - public Object nothing; // generic type - public MyWritable hadoopCitizen; // writableType - public List<String> collection; - } - - // all public test - public static class AllPublic extends ComplexNestedClass { - public ArrayList<String> somethingFancy; // generic type - public HashMultiset<Integer> fancyIds; // generic type - public String[] fancyArray; // generic type - } - - public static class ParentSettingGenerics extends PojoWithGenerics<Integer, Long> { - public String field3; - } - public static class PojoWithGenerics<T1, T2> { - public int key; - public T1 field1; - public T2 field2; - } - - public static class ComplexHierarchyTop extends ComplexHierarchy<Tuple1<String>> {} - public static class ComplexHierarchy<T> extends PojoWithGenerics<FromTuple,T> {} - - // extends from Tuple and adds a field - public static class FromTuple extends Tuple3<String, String, Long> { - private static final long serialVersionUID = 1L; - public int special; - } - - public static class IncorrectPojo { - private int isPrivate; - public int getIsPrivate() { - return isPrivate; - } - // setter is missing (intentional) - } - - // correct pojo - public static class BeanStylePojo { - public String abc; - private int field; - public int getField() { - return this.field; - } - public void setField(int f) { - this.field = f; - } - } - public static class WrongCtorPojo { - public int a; - public WrongCtorPojo(int a) { - this.a = a; - } - } - - public static class PojoWithGenericFields { - private Collection<String> users; - private boolean favorited; - - public boolean isFavorited() { - return favorited; - } - - public void setFavorited(boolean favorited) { - this.favorited = favorited; - } - - public Collection<String> getUsers() { - return users; - } - - public void setUsers(Collection<String> users) { - this.users = users; - } - } - @Test - public void testPojoWithGenericFields() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenericFields.class); - - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - } - - - // in this test, the location of the getters and setters is mixed across the type hierarchy. - public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> { - public void setPackageProtected(String in) { - this.packageProtected = in; - } - } - public static class GenericPojoGetterSetterCheck<T> { - T packageProtected; - public T getPackageProtected() { - return packageProtected; - } - } - - @Test - public void testIncorrectPojos() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class); - Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>); - - typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class); - Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>); - } - - @Test - public void testCorrectPojos() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class); - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - - typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class); - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - } - - @Test - public void testPojoWC() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(WC.class); - checkWCPojoAsserts(typeForClass); - - WC t = new WC(); - t.complex = new ComplexNestedClass(); - TypeInformation<?> typeForObject = TypeExtractor.getForObject(t); - checkWCPojoAsserts(typeForObject); - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - private void checkWCPojoAsserts(TypeInformation<?> typeInfo) { - Assert.assertFalse(typeInfo.isBasicType()); - Assert.assertFalse(typeInfo.isTupleType()); - Assert.assertEquals(10, typeInfo.getTotalFields()); - Assert.assertTrue(typeInfo instanceof PojoTypeInfo); - PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo; - - List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>(); - String[] fields = {"count", - "complex.date", - "complex.hadoopCitizen", - "complex.collection", - "complex.nothing", - "complex.someFloat", - "complex.someNumberWithÃnicödeNäme", - "complex.word.f0", - "complex.word.f1", - "complex.word.f2"}; - int[] positions = {9, - 1, - 2, - 0, - 3, - 4, - 5, - 6, - 7, - 8}; - Assert.assertEquals(fields.length, positions.length); - for(int i = 0; i < fields.length; i++) { - pojoType.getFlatFields(fields[i], 0, ffd); - Assert.assertEquals("Too many keys returned", 1, ffd.size()); - Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition()); - ffd.clear(); - } - - pojoType.getFlatFields("complex.word.*", 0, ffd); - Assert.assertEquals(3, ffd.size()); - // check if it returns 5,6,7 - for(FlatFieldDescriptor ffdE : ffd) { - final int pos = ffdE.getPosition(); - Assert.assertTrue(pos <= 8 ); - Assert.assertTrue(6 <= pos ); - if(pos == 6) { - Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); - } - if(pos == 7) { - Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); - } - if(pos == 8) { - Assert.assertEquals(String.class, ffdE.getType().getTypeClass()); - } - } - ffd.clear(); - - // scala style full tuple selection for pojos - pojoType.getFlatFields("complex.word._", 0, ffd); - Assert.assertEquals(3, ffd.size()); - ffd.clear(); - - pojoType.getFlatFields("complex.*", 0, ffd); - Assert.assertEquals(9, ffd.size()); - // check if it returns 0-7 - for(FlatFieldDescriptor ffdE : ffd) { - final int pos = ffdE.getPosition(); - Assert.assertTrue(ffdE.getPosition() <= 8 ); - Assert.assertTrue(0 <= ffdE.getPosition() ); - - if(pos == 0) { - Assert.assertEquals(List.class, ffdE.getType().getTypeClass()); - } - if(pos == 1) { - Assert.assertEquals(Date.class, ffdE.getType().getTypeClass()); - } - if(pos == 2) { - Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass()); - } - if(pos == 3) { - Assert.assertEquals(Object.class, ffdE.getType().getTypeClass()); - } - if(pos == 4) { - Assert.assertEquals(Float.class, ffdE.getType().getTypeClass()); - } - if(pos == 5) { - Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); - } - if(pos == 6) { - Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); - } - if(pos == 7) { - Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); - } - if(pos == 8) { - Assert.assertEquals(String.class, ffdE.getType().getTypeClass()); - } - if(pos == 9) { - Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); - } - } - ffd.clear(); - - pojoType.getFlatFields("*", 0, ffd); - Assert.assertEquals(10, ffd.size()); - // check if it returns 0-8 - for(FlatFieldDescriptor ffdE : ffd) { - Assert.assertTrue(ffdE.getPosition() <= 9 ); - Assert.assertTrue(0 <= ffdE.getPosition() ); - if(ffdE.getPosition() == 9) { - Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); - } - } - ffd.clear(); - - TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex - Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo); - - Assert.assertEquals(7, typeComplexNested.getArity()); - Assert.assertEquals(9, typeComplexNested.getTotalFields()); - PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) typeComplexNested; - - boolean dateSeen = false, intSeen = false, floatSeen = false, - tupleSeen = false, objectSeen = false, writableSeen = false, collectionSeen = false; - for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) { - PojoField field = pojoTypeComplexNested.getPojoFieldAt(i); - String name = field.getField().getName(); - if(name.equals("date")) { - if(dateSeen) { - Assert.fail("already seen"); - } - dateSeen = true; - Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.getTypeInformation()); - Assert.assertEquals(Date.class, field.getTypeInformation().getTypeClass()); - } else if(name.equals("someNumberWithÃnicödeNäme")) { - if(intSeen) { - Assert.fail("already seen"); - } - intSeen = true; - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); - Assert.assertEquals(Integer.class, field.getTypeInformation().getTypeClass()); - } else if(name.equals("someFloat")) { - if(floatSeen) { - Assert.fail("already seen"); - } - floatSeen = true; - Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.getTypeInformation()); - Assert.assertEquals(Float.class, field.getTypeInformation().getTypeClass()); - } else if(name.equals("word")) { - if(tupleSeen) { - Assert.fail("already seen"); - } - tupleSeen = true; - Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>); - Assert.assertEquals(Tuple3.class, field.getTypeInformation().getTypeClass()); - // do some more advanced checks on the tuple - TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.getTypeInformation(); - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(0)); - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(1)); - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(2)); - } else if(name.equals("nothing")) { - if(objectSeen) { - Assert.fail("already seen"); - } - objectSeen = true; - Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation()); - Assert.assertEquals(Object.class, field.getTypeInformation().getTypeClass()); - } else if(name.equals("hadoopCitizen")) { - if(writableSeen) { - Assert.fail("already seen"); - } - writableSeen = true; - Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.getTypeInformation()); - Assert.assertEquals(MyWritable.class, field.getTypeInformation().getTypeClass()); - } else if(name.equals("collection")) { - if(collectionSeen) { - Assert.fail("already seen"); - } - collectionSeen = true; - Assert.assertEquals(new GenericTypeInfo(List.class), field.getTypeInformation()); - - } else { - Assert.fail("field "+field+" is not expected"); - } - } - Assert.assertTrue("Field was not present", dateSeen); - Assert.assertTrue("Field was not present", intSeen); - Assert.assertTrue("Field was not present", floatSeen); - Assert.assertTrue("Field was not present", tupleSeen); - Assert.assertTrue("Field was not present", objectSeen); - Assert.assertTrue("Field was not present", writableSeen); - Assert.assertTrue("Field was not present", collectionSeen); - - TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int count - Assert.assertTrue(typeAtOne instanceof BasicTypeInfo); - - Assert.assertEquals(typeInfo.getTypeClass(), WC.class); - Assert.assertEquals(typeInfo.getArity(), 2); - } - - @Test - public void testPojoAllPublic() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class); - checkAllPublicAsserts(typeForClass); - - TypeInformation<?> typeForObject = TypeExtractor.getForObject(new AllPublic() ); - checkAllPublicAsserts(typeForObject); - } - - private void checkAllPublicAsserts(TypeInformation<?> typeInformation) { - Assert.assertTrue(typeInformation instanceof PojoTypeInfo); - Assert.assertEquals(10, typeInformation.getArity()); - Assert.assertEquals(12, typeInformation.getTotalFields()); - // check if the three additional fields are identified correctly - boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false; - PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation; - for(int i = 0; i < pojoTypeForClass.getArity(); i++) { - PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.getField().getName(); - if(name.equals("somethingFancy")) { - if(arrayListSeen) { - Assert.fail("already seen"); - } - arrayListSeen = true; - Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo); - Assert.assertEquals(ArrayList.class, field.getTypeInformation().getTypeClass()); - } else if(name.equals("fancyIds")) { - if(multisetSeen) { - Assert.fail("already seen"); - } - multisetSeen = true; - Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo); - Assert.assertEquals(HashMultiset.class, field.getTypeInformation().getTypeClass()); - } else if(name.equals("fancyArray")) { - if(strArraySeen) { - Assert.fail("already seen"); - } - strArraySeen = true; - Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.getTypeInformation()); - Assert.assertEquals(String[].class, field.getTypeInformation().getTypeClass()); - } else if(Arrays.asList("date", "someNumberWithÃnicödeNäme", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) { - // ignore these, they are inherited from the ComplexNestedClass - } - else { - Assert.fail("field "+field+" is not expected"); - } - } - Assert.assertTrue("Field was not present", arrayListSeen); - Assert.assertTrue("Field was not present", multisetSeen); - Assert.assertTrue("Field was not present", strArraySeen); - } - - @Test - public void testPojoExtendingTuple() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(FromTuple.class); - checkFromTuplePojo(typeForClass); - - FromTuple ft = new FromTuple(); - ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L; - TypeInformation<?> typeForObject = TypeExtractor.getForObject(ft); - checkFromTuplePojo(typeForObject); - } - - private void checkFromTuplePojo(TypeInformation<?> typeInformation) { - Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>); - Assert.assertEquals(4, typeInformation.getTotalFields()); - PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation; - for(int i = 0; i < pojoTypeForClass.getArity(); i++) { - PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.getField().getName(); - if(name.equals("special")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); - } else if(name.equals("f0") || name.equals("f1")) { - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); - } else if(name.equals("f2")) { - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation()); - } else { - Assert.fail("unexpected field"); - } - } - } - - @Test - public void testPojoWithGenerics() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class); - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; - for(int i = 0; i < pojoTypeForClass.getArity(); i++) { - PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.getField().getName(); - if(name.equals("field1")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); - } else if (name.equals("field2")) { - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation()); - } else if (name.equals("field3")) { - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); - } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); - } else { - Assert.fail("Unexpected field "+field); - } - } - } - - /** - * Test if the TypeExtractor is accepting untyped generics, - * making them GenericTypes - */ - @Test - public void testPojoWithGenericsSomeFieldsGeneric() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class); - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; - for(int i = 0; i < pojoTypeForClass.getArity(); i++) { - PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.getField().getName(); - if(name.equals("field1")) { - Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation()); - } else if (name.equals("field2")) { - Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation()); - } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); - } else { - Assert.fail("Unexpected field "+field); - } - } - } - - - @Test - public void testPojoWithComplexHierarchy() { - TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class); - Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>); - PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass; - for(int i = 0; i < pojoTypeForClass.getArity(); i++) { - PojoField field = pojoTypeForClass.getPojoFieldAt(i); - String name = field.getField().getName(); - if(name.equals("field1")) { - Assert.assertTrue(field.getTypeInformation() instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!) - } else if (name.equals("field2")) { - Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>); - Assert.assertTrue( ((TupleTypeInfo<?>)field.getTypeInformation()).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) ); - } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); - } else { - Assert.fail("Unexpected field "+field); - } - } - } - - - public static class Vertex<K, V> { - - private K key1; - private K key2; - private V value; - - public Vertex() {} - - public Vertex(K key, V value) { - this.key1 = key; - this.key2 = key; - this.value = value; - } - - public Vertex(K key1, K key2, V value) { - this.key1 = key1; - this.key2 = key2; - this.value = value; - } - - public void setKey1(K key1) { - this.key1 = key1; - } - - public void setKey2(K key2) { - this.key2 = key2; - } - - public K getKey1() { - return key1; - } - - public K getKey2() { - return key2; - } - - public void setValue(V value) { - this.value = value; - } - - public V getValue() { - return value; - } - } - - public static class VertexTyped extends Vertex<Long, Double>{ - public VertexTyped(Long l, Double d) { - super(l, d); - } - public VertexTyped() { - } - } - - @Test - public void testGetterSetterWithVertex() { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0)); - } - - public static class MyMapper<T> implements MapFunction<PojoWithGenerics<Long, T>, PojoWithGenerics<T,T>> { - private static final long serialVersionUID = 1L; - - @Override - public PojoWithGenerics<T, T> map(PojoWithGenerics<Long, T> value) - throws Exception { - return null; - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testGenericPojoTypeInference1() { - MapFunction<?, ?> function = new MyMapper<String>(); - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) - TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=Long,field2=String>")); - Assert.assertTrue(ti instanceof PojoTypeInfo<?>); - PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti; - for(int i = 0; i < pti.getArity(); i++) { - PojoField field = pti.getPojoFieldAt(i); - String name = field.getField().getName(); - if(name.equals("field1")) { - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); - } else if (name.equals("field2")) { - Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation()); - } else if (name.equals("key")) { - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation()); - } else { - Assert.fail("Unexpected field "+field); - } - } - } - - public static class PojoTuple<A, B, C> extends Tuple3<B, C, Long> { - private static final long serialVersionUID = 1L; - - public A extraField; - } - - public static class MyMapper2<D, E> implements MapFunction<Tuple2<E, D>, PojoTuple<E, D, D>> { - private static final long serialVersionUID = 1L; - - @Override - public PojoTuple<E, D, D> map(Tuple2<E, D> value) throws Exception { - return null; - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testGenericPojoTypeInference2() { - MapFunction<?, ?> function = new MyMapper2<Boolean, Character>(); - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) - TypeInfoParser.parse("Tuple2<Character,Boolean>")); - Assert.assertTrue(ti instanceof PojoTypeInfo<?>); - PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti; - for(int i = 0; i < pti.getArity(); i++) { - PojoField field = pti.getPojoFieldAt(i); - String name = field.getField().getName(); - if(name.equals("extraField")) { - Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.getTypeInformation()); - } else if (name.equals("f0")) { - Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation()); - } else if (name.equals("f1")) { - Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation()); - } else if (name.equals("f2")) { - Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation()); - } else { - Assert.fail("Unexpected field "+field); - } - } - } - - public static class MyMapper3<D, E> implements MapFunction<PojoTuple<E, D, D>, Tuple2<E, D>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<E, D> map(PojoTuple<E, D, D> value) throws Exception { - return null; - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testGenericPojoTypeInference3() { - MapFunction<?, ?> function = new MyMapper3<Boolean, Character>(); - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) - TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoTuple<extraField=char,f0=boolean,f1=boolean,f2=long>")); - Assert.assertTrue(ti instanceof TupleTypeInfo<?>); - TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; - Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, tti.getTypeAt(0)); - Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(1)); - } - - public static class PojoWithParameterizedFields1<Z> { - public Tuple2<Z, Z> field; - } - - public static class MyMapper4<A> implements MapFunction<PojoWithParameterizedFields1<A>, A> { - private static final long serialVersionUID = 1L; - @Override - public A map(PojoWithParameterizedFields1<A> value) throws Exception { - return null; - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testGenericPojoTypeInference4() { - MapFunction<?, ?> function = new MyMapper4<Byte>(); - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) - TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithParameterizedFields1<field=Tuple2<byte,byte>>")); - Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti); - } - - public static class PojoWithParameterizedFields2<Z> { - public PojoWithGenerics<Z, Z> field; - } - - public static class MyMapper5<A> implements MapFunction<PojoWithParameterizedFields2<A>, A> { - private static final long serialVersionUID = 1L; - @Override - public A map(PojoWithParameterizedFields2<A> value) throws Exception { - return null; - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testGenericPojoTypeInference5() { - MapFunction<?, ?> function = new MyMapper5<Byte>(); - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) - TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithParameterizedFields2<" - + "field=org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=byte,field2=byte>" - + ">")); - Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti); - } - - public static class PojoWithParameterizedFields3<Z> { - public Z[] field; - } - - public static class MyMapper6<A> implements MapFunction<PojoWithParameterizedFields3<A>, A> { - private static final long serialVersionUID = 1L; - @Override - public A map(PojoWithParameterizedFields3<A> value) throws Exception { - return null; - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testGenericPojoTypeInference6() { - MapFunction<?, ?> function = new MyMapper6<Integer>(); - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) - TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithParameterizedFields3<" - + "field=int[]" - + ">")); - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); - } - - public static class MyMapper7<A> implements MapFunction<PojoWithParameterizedFields4<A>, A> { - private static final long serialVersionUID = 1L; - @Override - public A map(PojoWithParameterizedFields4<A> value) throws Exception { - return null; - } - } - - public static class PojoWithParameterizedFields4<Z> { - public Tuple1<Z>[] field; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testGenericPojoTypeInference7() { - MapFunction<?, ?> function = new MyMapper7<Integer>(); - - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) - TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest$PojoWithParameterizedFields4<" - + "field=Tuple1<int>[]" - + ">")); - Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); - } - - public static class RecursivePojo1 { - public RecursivePojo1 field; - } - - public static class RecursivePojo2 { - public Tuple1<RecursivePojo2> field; - } - - public static class RecursivePojo3 { - public NestedPojo field; - } - - public static class NestedPojo { - public RecursivePojo3 field; - } - - @Test - public void testRecursivePojo1() { - TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class); - Assert.assertTrue(ti instanceof PojoTypeInfo); - Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).getTypeInformation().getClass()); - } - - @Test - public void testRecursivePojo2() { - TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class); - Assert.assertTrue(ti instanceof PojoTypeInfo); - PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0); - Assert.assertTrue(pf.getTypeInformation() instanceof TupleTypeInfo); - Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.getTypeInformation()).getTypeAt(0).getClass()); - } - - @Test - public void testRecursivePojo3() { - TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class); - Assert.assertTrue(ti instanceof PojoTypeInfo); - PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0); - Assert.assertTrue(pf.getTypeInformation() instanceof PojoTypeInfo); - Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.getTypeInformation()).getPojoFieldAt(0).getTypeInformation().getClass()); - } - - public static class FooBarPojo { - public int foo, bar; - public FooBarPojo() {} - } - - public static class DuplicateMapper implements MapFunction<FooBarPojo, Tuple2<FooBarPojo, FooBarPojo>> { - @Override - public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) throws Exception { - return null; - } - } - - @Test - public void testDualUseOfPojo() { - MapFunction<?, ?> function = new DuplicateMapper(); - TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeExtractor.createTypeInfo(FooBarPojo.class)); - Assert.assertTrue(ti instanceof TupleTypeInfo); - TupleTypeInfo<?> tti = ((TupleTypeInfo) ti); - Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo); - Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo); - } -}