http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java deleted file mode 100644 index de24956..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ /dev/null @@ -1,592 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - - -public final class PojoSerializer<T> extends TypeSerializer<T> { - - // Flags for the header - private static byte IS_NULL = 1; - private static byte NO_SUBCLASS = 2; - private static byte IS_SUBCLASS = 4; - private static byte IS_TAGGED_SUBCLASS = 8; - - private static final long serialVersionUID = 1L; - - private final Class<T> clazz; - - private final TypeSerializer<Object>[] fieldSerializers; - - private final int numFields; - - private final Map<Class<?>, Integer> registeredClasses; - - private final TypeSerializer<?>[] registeredSerializers; - - private final ExecutionConfig executionConfig; - - private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache; - private transient ClassLoader cl; - // We need to handle these ourselves in writeObject()/readObject() - private transient Field[] fields; - - @SuppressWarnings("unchecked") - public PojoSerializer( - Class<T> clazz, - TypeSerializer<?>[] fieldSerializers, - Field[] fields, - ExecutionConfig executionConfig) { - - this.clazz = Preconditions.checkNotNull(clazz); - this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers); - this.fields = Preconditions.checkNotNull(fields); - this.numFields = fieldSerializers.length; - this.executionConfig = Preconditions.checkNotNull(executionConfig); - - LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); - - for (int i = 0; i < numFields; i++) { - this.fields[i].setAccessible(true); - } - - cl = Thread.currentThread().getContextClassLoader(); - - subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>(); - - // We only want those classes that are not our own class and are actually sub-classes. - List<Class<?>> cleanedTaggedClasses = new ArrayList<Class<?>>(registeredPojoTypes.size()); - for (Class<?> registeredClass: registeredPojoTypes) { - if (registeredClass.equals(clazz)) { - continue; - } - if (!clazz.isAssignableFrom(registeredClass)) { - continue; - } - cleanedTaggedClasses.add(registeredClass); - - } - this.registeredClasses = new LinkedHashMap<Class<?>, Integer>(cleanedTaggedClasses.size()); - registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()]; - - int id = 0; - for (Class<?> registeredClass: cleanedTaggedClasses) { - this.registeredClasses.put(registeredClass, id); - TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(registeredClass); - registeredSerializers[id] = typeInfo.createSerializer(executionConfig); - - id++; - } - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - out.writeInt(fields.length); - for (Field field: fields) { - out.writeObject(field.getDeclaringClass()); - out.writeUTF(field.getName()); - } - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - int numFields = in.readInt(); - fields = new Field[numFields]; - for (int i = 0; i < numFields; i++) { - Class<?> clazz = (Class<?>)in.readObject(); - String fieldName = in.readUTF(); - fields[i] = null; - // try superclasses as well - while (clazz != null) { - try { - fields[i] = clazz.getDeclaredField(fieldName); - fields[i].setAccessible(true); - break; - } catch (NoSuchFieldException e) { - clazz = clazz.getSuperclass(); - } - } - if (fields[i] == null) { - throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup." - + " (" + fieldName + ")"); - } - } - - cl = Thread.currentThread().getContextClassLoader(); - subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>(); - } - - private TypeSerializer<?> getSubclassSerializer(Class<?> subclass) { - TypeSerializer<?> result = subclassSerializerCache.get(subclass); - if (result == null) { - - TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(subclass); - result = typeInfo.createSerializer(executionConfig); - if (result instanceof PojoSerializer) { - PojoSerializer<?> subclassSerializer = (PojoSerializer<?>) result; - subclassSerializer.copyBaseFieldOrder(this); - } - subclassSerializerCache.put(subclass, result); - - } - return result; - } - - @SuppressWarnings("unused") - private boolean hasField(Field f) { - for (Field field: fields) { - if (f.equals(field)) { - return true; - } - } - return false; - } - - private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) { - // do nothing for now, but in the future, adapt subclass serializer to have same - // ordering as base class serializer so that binary comparison on base class fields - // can work - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public PojoSerializer<T> duplicate() { - boolean stateful = false; - TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length]; - - for (int i = 0; i < fieldSerializers.length; i++) { - duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); - if (duplicateFieldSerializers[i] != fieldSerializers[i]) { - // at least one of them is stateful - stateful = true; - } - } - - if (stateful) { - return new PojoSerializer<T>(clazz, duplicateFieldSerializers, fields, executionConfig); - } else { - return this; - } - } - - - @Override - public T createInstance() { - if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) { - return null; - } - try { - T t = clazz.newInstance(); - initializeFields(t); - return t; - } - catch (Exception e) { - throw new RuntimeException("Cannot instantiate class.", e); - } - } - - protected void initializeFields(T t) { - for (int i = 0; i < numFields; i++) { - try { - fields[i].set(t, fieldSerializers[i].createInstance()); - } catch (IllegalAccessException e) { - throw new RuntimeException("Cannot initialize fields.", e); - } - } - } - - @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public T copy(T from) { - if (from == null) { - return null; - } - - Class<?> actualType = from.getClass(); - if (actualType == clazz) { - T target; - try { - target = (T) from.getClass().newInstance(); - } - catch (Throwable t) { - throw new RuntimeException("Cannot instantiate class.", t); - } - // no subclass - try { - for (int i = 0; i < numFields; i++) { - Object value = fields[i].get(from); - if (value != null) { - Object copy = fieldSerializers[i].copy(value); - fields[i].set(target, copy); - } - else { - fields[i].set(target, null); - } - } - } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before."); - - } - return target; - } else { - // subclass - TypeSerializer subclassSerializer = getSubclassSerializer(actualType); - return (T) subclassSerializer.copy(from); - } - } - - @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public T copy(T from, T reuse) { - if (from == null) { - return null; - } - - Class<?> actualType = from.getClass(); - if (reuse == null || actualType != reuse.getClass()) { - // cannot reuse, do a non-reuse copy - return copy(from); - } - - if (actualType == clazz) { - try { - for (int i = 0; i < numFields; i++) { - Object value = fields[i].get(from); - if (value != null) { - Object reuseValue = fields[i].get(reuse); - Object copy; - if(reuseValue != null) { - copy = fieldSerializers[i].copy(value, reuseValue); - } - else { - copy = fieldSerializers[i].copy(value); - } - fields[i].set(reuse, copy); - } - else { - fields[i].set(reuse, null); - } - } - } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); - } - } else { - TypeSerializer subclassSerializer = getSubclassSerializer(actualType); - reuse = (T) subclassSerializer.copy(from, reuse); - } - - return reuse; - } - - @Override - public int getLength() { - return -1; - } - - - @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public void serialize(T value, DataOutputView target) throws IOException { - int flags = 0; - // handle null values - if (value == null) { - flags |= IS_NULL; - target.writeByte(flags); - return; - } - - Integer subclassTag = -1; - Class<?> actualClass = value.getClass(); - TypeSerializer subclassSerializer = null; - if (clazz != actualClass) { - subclassTag = registeredClasses.get(actualClass); - if (subclassTag != null) { - flags |= IS_TAGGED_SUBCLASS; - subclassSerializer = registeredSerializers[subclassTag]; - } else { - flags |= IS_SUBCLASS; - subclassSerializer = getSubclassSerializer(actualClass); - } - } else { - flags |= NO_SUBCLASS; - } - - target.writeByte(flags); - - if ((flags & IS_SUBCLASS) != 0) { - target.writeUTF(actualClass.getName()); - } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { - target.writeByte(subclassTag); - } - - - if ((flags & NO_SUBCLASS) != 0) { - try { - for (int i = 0; i < numFields; i++) { - Object o = fields[i].get(value); - if (o == null) { - target.writeBoolean(true); // null field handling - } else { - target.writeBoolean(false); - fieldSerializers[i].serialize(o, target); - } - } - } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); - - } - } else { - // subclass - if (subclassSerializer != null) { - subclassSerializer.serialize(value, target); - } - } - } - - @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public T deserialize(DataInputView source) throws IOException { - int flags = source.readByte(); - if((flags & IS_NULL) != 0) { - return null; - } - - T target; - - Class<?> actualSubclass = null; - TypeSerializer subclassSerializer = null; - - if ((flags & IS_SUBCLASS) != 0) { - String subclassName = source.readUTF(); - try { - actualSubclass = Class.forName(subclassName, true, cl); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot instantiate class.", e); - } - subclassSerializer = getSubclassSerializer(actualSubclass); - target = (T) subclassSerializer.createInstance(); - // also initialize fields for which the subclass serializer is not responsible - initializeFields(target); - } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { - - int subclassTag = source.readByte(); - subclassSerializer = registeredSerializers[subclassTag]; - target = (T) subclassSerializer.createInstance(); - // also initialize fields for which the subclass serializer is not responsible - initializeFields(target); - } else { - target = createInstance(); - } - - if ((flags & NO_SUBCLASS) != 0) { - try { - for (int i = 0; i < numFields; i++) { - boolean isNull = source.readBoolean(); - if (isNull) { - fields[i].set(target, null); - } else { - Object field = fieldSerializers[i].deserialize(source); - fields[i].set(target, field); - } - } - } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); - - } - } else { - if (subclassSerializer != null) { - target = (T) subclassSerializer.deserialize(target, source); - } - } - return target; - } - - @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public T deserialize(T reuse, DataInputView source) throws IOException { - - // handle null values - int flags = source.readByte(); - if((flags & IS_NULL) != 0) { - return null; - } - - Class<?> subclass = null; - TypeSerializer subclassSerializer = null; - if ((flags & IS_SUBCLASS) != 0) { - String subclassName = source.readUTF(); - try { - subclass = Class.forName(subclassName, true, cl); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot instantiate class.", e); - } - subclassSerializer = getSubclassSerializer(subclass); - - if (reuse == null || subclass != reuse.getClass()) { - // cannot reuse - reuse = (T) subclassSerializer.createInstance(); - // also initialize fields for which the subclass serializer is not responsible - initializeFields(reuse); - } - } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { - int subclassTag = source.readByte(); - subclassSerializer = registeredSerializers[subclassTag]; - - if (reuse == null || ((PojoSerializer)subclassSerializer).clazz != reuse.getClass()) { - // cannot reuse - reuse = (T) subclassSerializer.createInstance(); - // also initialize fields for which the subclass serializer is not responsible - initializeFields(reuse); - } - } else { - if (reuse == null || clazz != reuse.getClass()) { - reuse = createInstance(); - } - } - - if ((flags & NO_SUBCLASS) != 0) { - try { - for (int i = 0; i < numFields; i++) { - boolean isNull = source.readBoolean(); - if (isNull) { - fields[i].set(reuse, null); - } else { - Object field; - - Object reuseField = fields[i].get(reuse); - if(reuseField != null) { - field = fieldSerializers[i].deserialize(reuseField, source); - } - else { - field = fieldSerializers[i].deserialize(source); - } - - fields[i].set(reuse, field); - } - } - } catch (IllegalAccessException e) { - throw new RuntimeException( - "Error during POJO copy, this should not happen since we check the fields before."); - } - } else { - if (subclassSerializer != null) { - reuse = (T) subclassSerializer.deserialize(reuse, source); - } - } - - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - // copy the flags - int flags = source.readByte(); - target.writeByte(flags); - - if ((flags & IS_NULL) != 0) { - // is a null value, nothing further to copy - return; - } - - TypeSerializer<?> subclassSerializer = null; - if ((flags & IS_SUBCLASS) != 0) { - String className = source.readUTF(); - target.writeUTF(className); - try { - Class<?> subclass = Class.forName(className, true, Thread.currentThread() - .getContextClassLoader()); - subclassSerializer = getSubclassSerializer(subclass); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot instantiate class.", e); - } - } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { - int subclassTag = source.readByte(); - target.writeByte(subclassTag); - subclassSerializer = registeredSerializers[subclassTag]; - } - - if ((flags & NO_SUBCLASS) != 0) { - for (int i = 0; i < numFields; i++) { - boolean isNull = source.readBoolean(); - target.writeBoolean(isNull); - if (!isNull) { - fieldSerializers[i].copy(source, target); - } - } - } else { - if (subclassSerializer != null) { - subclassSerializer.copy(source, target); - } - } - } - - @Override - public int hashCode() { - return 31 * (31 * Arrays.hashCode(fieldSerializers) + Arrays.hashCode(registeredSerializers)) + - Objects.hash(clazz, numFields, registeredClasses); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PojoSerializer) { - PojoSerializer<?> other = (PojoSerializer<?>) obj; - - return other.canEqual(this) && - clazz == other.clazz && - Arrays.equals(fieldSerializers, other.fieldSerializers) && - Arrays.equals(registeredSerializers, other.registeredSerializers) && - numFields == other.numFields && - registeredClasses.equals(other.registeredClasses); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof PojoSerializer; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java deleted file mode 100644 index 4b734a7..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.InstantiationUtil; - -public final class RuntimeComparatorFactory<T> implements TypeComparatorFactory<T>, java.io.Serializable { - - private static final long serialVersionUID = 1L; - - - private static final String CONFIG_KEY = "SER_DATA"; - - private TypeComparator<T> comparator; - - - public RuntimeComparatorFactory() {} - - public RuntimeComparatorFactory(TypeComparator<T> comparator) { - this.comparator = comparator; - } - - @Override - public void writeParametersToConfig(Configuration config) { - try { - InstantiationUtil.writeObjectToConfig(comparator, config, CONFIG_KEY); - } - catch (Exception e) { - throw new RuntimeException("Could not serialize comparator into the configuration.", e); - } - } - - @SuppressWarnings("unchecked") - @Override - public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { - try { - comparator = (TypeComparator<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY, cl); - } - catch (ClassNotFoundException e) { - throw e; - } - catch (Exception e) { - throw new RuntimeException("Could not serialize serializer into the configuration.", e); - } - } - - @Override - public TypeComparator<T> createComparator() { - if (comparator != null) { - return comparator; - } else { - throw new RuntimeException("ComparatorFactory has not been initialized from configuration."); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java deleted file mode 100644 index 31e28f7..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.GenericPairComparator; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; - -public final class RuntimePairComparatorFactory<T1, T2> - implements TypePairComparatorFactory<T1, T2>, java.io.Serializable { - - private static final long serialVersionUID = 1L; - - @Override - public TypePairComparator<T1, T2> createComparator12( - TypeComparator<T1> comparator1, - TypeComparator<T2> comparator2) { - return new GenericPairComparator<T1, T2>(comparator1, comparator2); - } - - @Override - public TypePairComparator<T2, T1> createComparator21( - TypeComparator<T1> comparator1, - TypeComparator<T2> comparator2) { - return new GenericPairComparator<T2, T1>(comparator2, comparator1); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java deleted file mode 100644 index 96aff73..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.InstantiationUtil; - -public final class RuntimeSerializerFactory<T> implements TypeSerializerFactory<T>, java.io.Serializable { - - private static final long serialVersionUID = 1L; - - - private static final String CONFIG_KEY_SER = "SER_DATA"; - - private static final String CONFIG_KEY_CLASS = "CLASS_DATA"; - - - private TypeSerializer<T> serializer; - - private boolean firstSerializer = true; - - private Class<T> clazz; - - // Because we read the class from the TaskConfig and instantiate ourselves - public RuntimeSerializerFactory() {} - - public RuntimeSerializerFactory(TypeSerializer<T> serializer, Class<T> clazz) { - if (serializer == null || clazz == null) { - throw new NullPointerException(); - } - - this.clazz = clazz; - this.serializer = serializer; - } - - - @Override - public void writeParametersToConfig(Configuration config) { - try { - InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS); - InstantiationUtil.writeObjectToConfig(serializer, config, CONFIG_KEY_SER); - } - catch (Exception e) { - throw new RuntimeException("Could not serialize serializer into the configuration.", e); - } - } - - @SuppressWarnings("unchecked") - @Override - public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { - if (config == null || cl == null) { - throw new NullPointerException(); - } - - try { - this.clazz = (Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl); - this.serializer = (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl); - firstSerializer = true; - } - catch (ClassNotFoundException e) { - throw e; - } - catch (Exception e) { - throw new RuntimeException("Could not load deserializer from the configuration.", e); - } - } - - @Override - public TypeSerializer<T> getSerializer() { - if (this.serializer != null) { - if (firstSerializer) { - firstSerializer = false; - return this.serializer; - } else { - return this.serializer.duplicate(); - } - } else { - throw new RuntimeException("SerializerFactory has not been initialized from configuration."); - } - } - - @Override - public Class<T> getDataType() { - return clazz; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return clazz.hashCode() ^ serializer.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof RuntimeSerializerFactory) { - RuntimeSerializerFactory<?> other = (RuntimeSerializerFactory<?>) obj; - - return this.clazz == other.clazz && - this.serializer.equals(other.serializer); - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java deleted file mode 100644 index a06ff1a..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class Tuple0Serializer extends TupleSerializer<Tuple0> { - - private static final long serialVersionUID = 1278813169022975971L; - - public static final Tuple0Serializer INSTANCE = new Tuple0Serializer(); - - // ------------------------------------------------------------------------ - - private Tuple0Serializer() { - super(Tuple0.class, new TypeSerializer<?>[0]); - } - - // ------------------------------------------------------------------------ - - @Override - public Tuple0Serializer duplicate() { - return this; - } - - @Override - public Tuple0 createInstance() { - return Tuple0.INSTANCE; - } - - @Override - public Tuple0 createInstance(Object[] fields) { - if (fields == null || fields.length == 0) { - return Tuple0.INSTANCE; - } - - throw new UnsupportedOperationException( - "Tuple0 cannot take any data, as it has zero fields."); - } - - @Override - public Tuple0 copy(Tuple0 from) { - return from; - } - - @Override - public Tuple0 copy(Tuple0 from, Tuple0 reuse) { - return reuse; - } - - @Override - public int getLength() { - return 1; - } - - @Override - public void serialize(Tuple0 record, DataOutputView target) throws IOException { - target.writeByte(42); - } - - @Override - public Tuple0 deserialize(DataInputView source) throws IOException { - source.readByte(); - return Tuple0.INSTANCE; - } - - @Override - public Tuple0 deserialize(Tuple0 reuse, DataInputView source) throws IOException { - source.readByte(); - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - target.writeByte(source.readByte()); - } - - // ------------------------------------------------------------------------ - - - @Override - public int hashCode() { - return Tuple0Serializer.class.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof Tuple0Serializer) { - Tuple0Serializer other = (Tuple0Serializer) obj; - - return other.canEqual(this); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof Tuple0Serializer; - } - - @Override - public String toString() { - return "Tuple0Serializer"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java deleted file mode 100644 index 875ecc2..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.types.KeyFieldOutOfBoundsException; -import org.apache.flink.types.NullFieldException; -import org.apache.flink.types.NullKeyFieldException; - - -public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<T> { - - private static final long serialVersionUID = 1L; - - public TupleComparator(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) { - super(keyPositions, comparators, serializers); - } - - private TupleComparator(TupleComparator<T> toClone) { - super(toClone); - } - - // -------------------------------------------------------------------------------------------- - // Comparator Methods - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("unchecked") - @Override - public int hash(T value) { - int i = 0; - try { - int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0])); - for (i = 1; i < this.keyPositions.length; i++) { - code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component - code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i])); - } - return code; - } - catch (NullFieldException nfex) { - throw new NullKeyFieldException(nfex); - } - catch (IndexOutOfBoundsException iobex) { - throw new KeyFieldOutOfBoundsException(keyPositions[i]); - } - } - - @SuppressWarnings("unchecked") - @Override - public void setReference(T toCompare) { - int i = 0; - try { - for (; i < this.keyPositions.length; i++) { - this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i])); - } - } - catch (NullFieldException nfex) { - throw new NullKeyFieldException(nfex); - } - catch (IndexOutOfBoundsException iobex) { - throw new KeyFieldOutOfBoundsException(keyPositions[i]); - } - } - - @SuppressWarnings("unchecked") - @Override - public boolean equalToReference(T candidate) { - int i = 0; - try { - for (; i < this.keyPositions.length; i++) { - if (!this.comparators[i].equalToReference(candidate.getFieldNotNull(this.keyPositions[i]))) { - return false; - } - } - return true; - } - catch (NullFieldException nfex) { - throw new NullKeyFieldException(nfex); - } - catch (IndexOutOfBoundsException iobex) { - throw new KeyFieldOutOfBoundsException(keyPositions[i]); - } - } - - @SuppressWarnings("unchecked") - @Override - public int compare(T first, T second) { - int i = 0; - try { - for (; i < keyPositions.length; i++) { - int keyPos = keyPositions[i]; - int cmp = comparators[i].compare(first.getFieldNotNull(keyPos), second.getFieldNotNull(keyPos)); - - if (cmp != 0) { - return cmp; - } - } - return 0; - } - catch (NullFieldException nfex) { - throw new NullKeyFieldException(nfex); - } - catch (IndexOutOfBoundsException iobex) { - throw new KeyFieldOutOfBoundsException(keyPositions[i]); - } - } - - @SuppressWarnings("unchecked") - @Override - public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) { - int i = 0; - try { - for (; i < this.numLeadingNormalizableKeys && numBytes > 0; i++) { - int len = this.normalizedKeyLengths[i]; - len = numBytes >= len ? len : numBytes; - this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]), target, offset, len); - numBytes -= len; - offset += len; - } - } catch (NullFieldException nfex) { - throw new NullKeyFieldException(nfex); - } catch (NullPointerException npex) { - throw new NullKeyFieldException(this.keyPositions[i]); - } - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - int localIndex = index; - for(int i = 0; i < comparators.length; i++) { - localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex); - } - return localIndex - index; - } - - public TypeComparator<T> duplicate() { - return new TupleComparator<T>(this); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java deleted file mode 100644 index 28169e5..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; -import java.util.List; - -import org.apache.flink.api.common.typeutils.CompositeTypeComparator; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.KeyFieldOutOfBoundsException; -import org.apache.flink.types.NullKeyFieldException; - - -public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T> implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - /** key positions describe which fields are keys in what order */ - protected int[] keyPositions; - - /** comparators for the key fields, in the same order as the key fields */ - @SuppressWarnings("rawtypes") - protected TypeComparator[] comparators; - - protected int[] normalizedKeyLengths; - - protected int numLeadingNormalizableKeys; - - protected int normalizableKeyPrefixLen; - - protected boolean invertNormKey; - - - /** serializers to deserialize the first n fields for comparison */ - @SuppressWarnings("rawtypes") - protected TypeSerializer[] serializers; - - // cache for the deserialized field objects - protected transient Object[] deserializedFields1; - protected transient Object[] deserializedFields2; - - - @SuppressWarnings("unchecked") - public TupleComparatorBase(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) { - // set the default utils - this.keyPositions = keyPositions; - this.comparators = (TypeComparator<Object>[]) comparators; - this.serializers = (TypeSerializer<Object>[]) serializers; - - // set up auxiliary fields for normalized key support - this.normalizedKeyLengths = new int[keyPositions.length]; - int nKeys = 0; - int nKeyLen = 0; - boolean inverted = false; - - for (int i = 0; i < this.keyPositions.length; i++) { - TypeComparator<?> k = this.comparators[i]; - - // as long as the leading keys support normalized keys, we can build up the composite key - if (k.supportsNormalizedKey()) { - if (i == 0) { - // the first comparator decides whether we need to invert the key direction - inverted = k.invertNormalizedKey(); - } - else if (k.invertNormalizedKey() != inverted) { - // if a successor does not agree on the inversion direction, it cannot be part of the normalized key - break; - } - - nKeys++; - final int len = k.getNormalizeKeyLen(); - if (len < 0) { - throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len); - } - this.normalizedKeyLengths[i] = len; - nKeyLen += len; - - if (nKeyLen < 0) { - // overflow, which means we are out of budget for normalized key space anyways - nKeyLen = Integer.MAX_VALUE; - break; - } - } else { - break; - } - } - this.numLeadingNormalizableKeys = nKeys; - this.normalizableKeyPrefixLen = nKeyLen; - this.invertNormKey = inverted; - } - - protected TupleComparatorBase(TupleComparatorBase<T> toClone) { - privateDuplicate(toClone); - } - - // We need this because we cannot call the cloning constructor from the - // ScalaTupleComparator - protected void privateDuplicate(TupleComparatorBase<T> toClone) { - // copy fields and serializer factories - this.keyPositions = toClone.keyPositions; - - this.serializers = new TypeSerializer[toClone.serializers.length]; - for (int i = 0; i < toClone.serializers.length; i++) { - this.serializers[i] = toClone.serializers[i].duplicate(); - } - - this.comparators = new TypeComparator[toClone.comparators.length]; - for (int i = 0; i < toClone.comparators.length; i++) { - this.comparators[i] = toClone.comparators[i].duplicate(); - } - - this.normalizedKeyLengths = toClone.normalizedKeyLengths; - this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys; - this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen; - this.invertNormKey = toClone.invertNormKey; - } - - // -------------------------------------------------------------------------------------------- - // Comparator Methods - // -------------------------------------------------------------------------------------------- - - protected int[] getKeyPositions() { - return this.keyPositions; - } - - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public void getFlatComparator(List<TypeComparator> flatComparators) { - for(int i = 0; i < comparators.length; i++) { - if(comparators[i] instanceof CompositeTypeComparator) { - ((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators); - } else { - flatComparators.add(comparators[i]); - } - } - } - // -------------------------------------------------------------------------------------------- - // Comparator Methods - // -------------------------------------------------------------------------------------------- - - - @Override - public int compareToReference(TypeComparator<T> referencedComparator) { - TupleComparatorBase<T> other = (TupleComparatorBase<T>) referencedComparator; - - int i = 0; - try { - for (; i < this.keyPositions.length; i++) { - @SuppressWarnings("unchecked") - int cmp = this.comparators[i].compareToReference(other.comparators[i]); - if (cmp != 0) { - return cmp; - } - } - return 0; - } - catch (NullPointerException npex) { - throw new NullKeyFieldException(keyPositions[i]); - } - catch (IndexOutOfBoundsException iobex) { - throw new KeyFieldOutOfBoundsException(keyPositions[i]); - } - } - - @SuppressWarnings("unchecked") - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - if (deserializedFields1 == null) { - instantiateDeserializationUtils(); - } - - int i = 0; - try { - for (; i < serializers.length; i++) { - deserializedFields1[i] = serializers[i].deserialize(deserializedFields1[i], firstSource); - deserializedFields2[i] = serializers[i].deserialize(deserializedFields2[i], secondSource); - } - - for (i = 0; i < keyPositions.length; i++) { - int keyPos = keyPositions[i]; - int cmp = comparators[i].compare(deserializedFields1[keyPos], deserializedFields2[keyPos]); - if (cmp != 0) { - return cmp; - } - } - - return 0; - } catch (NullPointerException npex) { - throw new NullKeyFieldException(keyPositions[i]); - } catch (IndexOutOfBoundsException iobex) { - throw new KeyFieldOutOfBoundsException(keyPositions[i], iobex); - } - } - - @Override - public boolean supportsNormalizedKey() { - return this.numLeadingNormalizableKeys > 0; - } - - @Override - public int getNormalizeKeyLen() { - return this.normalizableKeyPrefixLen; - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return this.numLeadingNormalizableKeys < this.keyPositions.length || - this.normalizableKeyPrefixLen == Integer.MAX_VALUE || - this.normalizableKeyPrefixLen > keyBytes; - } - - @Override - public boolean invertNormalizedKey() { - return this.invertNormKey; - } - - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - // -------------------------------------------------------------------------------------------- - - protected final void instantiateDeserializationUtils() { - this.deserializedFields1 = new Object[this.serializers.length]; - this.deserializedFields2 = new Object[this.serializers.length]; - - for (int i = 0; i < this.serializers.length; i++) { - this.deserializedFields1[i] = this.serializers[i].createInstance(); - this.deserializedFields2[i] = this.serializers[i].createInstance(); - } - } - - // -------------------------------------------------------------------------------------------- - - /** - * A sequence of prime numbers to be used for salting the computed hash values. - * Based on some empirical evidence, we are using a 32-element subsequence of the - * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime). - * - * @see <a href="http://en.wikipedia.org/wiki/List_of_prime_numbers">http://en.wikipedia.org/wiki/List_of_prime_numbers</a> - * @see <a href="http://oeis.org/A068652">http://oeis.org/A068652</a> - */ - public static final int[] HASH_SALT = new int[] { - 73 , 79 , 97 , 113 , 131 , 197 , 199 , 311 , - 337 , 373 , 719 , 733 , 919 , 971 , 991 , 1193 , - 1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , - 19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 }; -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java deleted file mode 100644 index 0897063..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.NullFieldException; - - -public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> { - - private static final long serialVersionUID = 1L; - - public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) { - super(tupleClass, fieldSerializers); - } - - @Override - public TupleSerializer<T> duplicate() { - boolean stateful = false; - TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer<?>[fieldSerializers.length]; - - for (int i = 0; i < fieldSerializers.length; i++) { - duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); - if (duplicateFieldSerializers[i] != fieldSerializers[i]) { - // at least one of them is stateful - stateful = true; - } - } - - if (stateful) { - return new TupleSerializer<T>(tupleClass, duplicateFieldSerializers); - } else { - return this; - } - } - - @Override - public T createInstance() { - try { - T t = tupleClass.newInstance(); - - for (int i = 0; i < arity; i++) { - t.setField(fieldSerializers[i].createInstance(), i); - } - - return t; - } - catch (Exception e) { - throw new RuntimeException("Cannot instantiate tuple.", e); - } - } - - @Override - public T createInstance(Object[] fields) { - - try { - T t = tupleClass.newInstance(); - - for (int i = 0; i < arity; i++) { - t.setField(fields[i], i); - } - - return t; - } - catch (Exception e) { - throw new RuntimeException("Cannot instantiate tuple.", e); - } - } - - @Override - public T createOrReuseInstance(Object[] fields, T reuse) { - for (int i = 0; i < arity; i++) { - reuse.setField(fields[i], i); - } - return reuse; - } - - @Override - public T copy(T from) { - T target = instantiateRaw(); - for (int i = 0; i < arity; i++) { - Object copy = fieldSerializers[i].copy(from.getField(i)); - target.setField(copy, i); - } - return target; - } - - @Override - public T copy(T from, T reuse) { - for (int i = 0; i < arity; i++) { - Object copy = fieldSerializers[i].copy(from.getField(i), reuse.getField(i)); - reuse.setField(copy, i); - } - - return reuse; - } - - @Override - public void serialize(T value, DataOutputView target) throws IOException { - for (int i = 0; i < arity; i++) { - Object o = value.getField(i); - try { - fieldSerializers[i].serialize(o, target); - } catch (NullPointerException npex) { - throw new NullFieldException(i, npex); - } - } - } - - @Override - public T deserialize(DataInputView source) throws IOException { - T tuple = instantiateRaw(); - for (int i = 0; i < arity; i++) { - Object field = fieldSerializers[i].deserialize(source); - tuple.setField(field, i); - } - return tuple; - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - for (int i = 0; i < arity; i++) { - Object field = fieldSerializers[i].deserialize(reuse.getField(i), source); - reuse.setField(field, i); - } - return reuse; - } - - private T instantiateRaw() { - try { - return tupleClass.newInstance(); - } - catch (Exception e) { - throw new RuntimeException("Cannot instantiate tuple.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java deleted file mode 100644 index fc657a1..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Objects; - - -public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { - - private static final long serialVersionUID = 1L; - - protected final Class<T> tupleClass; - - protected final TypeSerializer<Object>[] fieldSerializers; - - protected final int arity; - - @SuppressWarnings("unchecked") - public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) { - this.tupleClass = Preconditions.checkNotNull(tupleClass); - this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers); - this.arity = fieldSerializers.length; - } - - public Class<T> getTupleClass() { - return this.tupleClass; - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public int getLength() { - return -1; - } - - public int getArity() { - return arity; - } - - // We use this in the Aggregate and Distinct Operators to create instances - // of immutable Typles (i.e. Scala Tuples) - public abstract T createInstance(Object[] fields); - - public abstract T createOrReuseInstance(Object[] fields, T reuse); - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - for (int i = 0; i < arity; i++) { - fieldSerializers[i].copy(source, target); - } - } - - @Override - public int hashCode() { - return 31 * Arrays.hashCode(fieldSerializers) + Objects.hash(tupleClass, arity); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TupleSerializerBase) { - TupleSerializerBase<?> other = (TupleSerializerBase<?>) obj; - - return other.canEqual(this) && - tupleClass == other.tupleClass && - Arrays.equals(fieldSerializers, other.fieldSerializers) && - arity == other.arity; - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof TupleSerializerBase; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java deleted file mode 100644 index 4b9629a..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.types.NormalizableKey; -import org.apache.flink.types.Value; -import org.apache.flink.util.InstantiationUtil; - -import com.esotericsoftware.kryo.Kryo; -import org.objenesis.strategy.StdInstantiatorStrategy; - -/** - * Comparator for all Value types that extend Key - */ -public class ValueComparator<T extends Value & Comparable<T>> extends TypeComparator<T> { - - private static final long serialVersionUID = 1L; - - private final Class<T> type; - - private final boolean ascendingComparison; - - private transient T reference; - - private transient T tempReference; - - private transient Kryo kryo; - - @SuppressWarnings("rawtypes") - private final TypeComparator[] comparators = new TypeComparator[] {this}; - - public ValueComparator(boolean ascending, Class<T> type) { - this.type = type; - this.ascendingComparison = ascending; - } - - @Override - public int hash(T record) { - return record.hashCode(); - } - - @Override - public void setReference(T toCompare) { - checkKryoInitialized(); - - reference = KryoUtils.copy(toCompare, kryo, new ValueSerializer<T>(type)); - } - - @Override - public boolean equalToReference(T candidate) { - return candidate.equals(this.reference); - } - - @Override - public int compareToReference(TypeComparator<T> referencedComparator) { - T otherRef = ((ValueComparator<T>) referencedComparator).reference; - int comp = otherRef.compareTo(reference); - return ascendingComparison ? comp : -comp; - } - - @Override - public int compare(T first, T second) { - int comp = first.compareTo(second); - return ascendingComparison ? comp : -comp; - } - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - if (reference == null) { - reference = InstantiationUtil.instantiate(type, Value.class); - } - if (tempReference == null) { - tempReference = InstantiationUtil.instantiate(type, Value.class); - } - - reference.read(firstSource); - tempReference.read(secondSource); - int comp = reference.compareTo(tempReference); - return ascendingComparison ? comp : -comp; - } - - @Override - public boolean supportsNormalizedKey() { - return NormalizableKey.class.isAssignableFrom(type); - } - - @Override - public int getNormalizeKeyLen() { - if (reference == null) { - reference = InstantiationUtil.instantiate(type, Value.class); - } - - NormalizableKey<?> key = (NormalizableKey<?>) reference; - return key.getMaxNormalizedKeyLen(); - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return keyBytes < getNormalizeKeyLen(); - } - - @Override - public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { - NormalizableKey<?> key = (NormalizableKey<?>) record; - key.copyNormalizedKey(target, offset, numBytes); - } - - @Override - public boolean invertNormalizedKey() { - return !ascendingComparison; - } - - @Override - public TypeComparator<T> duplicate() { - return new ValueComparator<T>(ascendingComparison, type); - } - - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); - - this.kryo.setAsmEnabled(true); - this.kryo.register(type); - } - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - target[index] = record; - return 1; - } - - @SuppressWarnings("rawtypes") - @Override - public TypeComparator[] getFlatComparators() { - return comparators; - } - - // -------------------------------------------------------------------------------------------- - // unsupported normalization - // -------------------------------------------------------------------------------------------- - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java deleted file mode 100644 index 9329866..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Value; -import org.apache.flink.util.InstantiationUtil; - -import com.esotericsoftware.kryo.Kryo; -import org.objenesis.strategy.StdInstantiatorStrategy; - -/** - * Serializer for {@link Value} types. Uses the value's serialization methods, and uses - * Kryo for deep object copies. - * - * @param <T> The type serialized. - */ -public class ValueSerializer<T extends Value> extends TypeSerializer<T> { - - private static final long serialVersionUID = 1L; - - private final Class<T> type; - - private transient Kryo kryo; - - private transient T copyInstance; - - // -------------------------------------------------------------------------------------------- - - public ValueSerializer(Class<T> type) { - this.type = Preconditions.checkNotNull(type); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public ValueSerializer<T> duplicate() { - return new ValueSerializer<T>(type); - } - - @Override - public T createInstance() { - return InstantiationUtil.instantiate(this.type); - } - - @Override - public T copy(T from) { - checkKryoInitialized(); - - return KryoUtils.copy(from, kryo, this); - } - - @Override - public T copy(T from, T reuse) { - checkKryoInitialized(); - - return KryoUtils.copy(from, reuse, kryo, this); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(T value, DataOutputView target) throws IOException { - value.write(target); - } - - @Override - public T deserialize(DataInputView source) throws IOException { - return deserialize(createInstance(), source); - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - reuse.read(source); - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - if (this.copyInstance == null) { - this.copyInstance = InstantiationUtil.instantiate(type); - } - - this.copyInstance.read(source); - this.copyInstance.write(target); - } - - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); - - this.kryo.setAsmEnabled(true); - this.kryo.register(type); - } - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return this.type.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof ValueSerializer) { - ValueSerializer<?> other = (ValueSerializer<?>) obj; - - return other.canEqual(this) && type == other.type; - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof ValueSerializer; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java deleted file mode 100644 index a03369a..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.types.NormalizableKey; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.io.Writable; - -import com.esotericsoftware.kryo.Kryo; -import org.objenesis.strategy.StdInstantiatorStrategy; - -public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> { - - private static final long serialVersionUID = 1L; - - private Class<T> type; - - private final boolean ascendingComparison; - - private transient T reference; - - private transient T tempReference; - - private transient Kryo kryo; - - @SuppressWarnings("rawtypes") - private final TypeComparator[] comparators = new TypeComparator[] {this}; - - public WritableComparator(boolean ascending, Class<T> type) { - this.type = type; - this.ascendingComparison = ascending; - } - - @Override - public int hash(T record) { - return record.hashCode(); - } - - @Override - public void setReference(T toCompare) { - checkKryoInitialized(); - - reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type)); - } - - @Override - public boolean equalToReference(T candidate) { - return candidate.equals(reference); - } - - @Override - public int compareToReference(TypeComparator<T> referencedComparator) { - T otherRef = ((WritableComparator<T>) referencedComparator).reference; - int comp = otherRef.compareTo(reference); - return ascendingComparison ? comp : -comp; - } - - @Override - public int compare(T first, T second) { - int comp = first.compareTo(second); - return ascendingComparison ? comp : -comp; - } - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - ensureReferenceInstantiated(); - ensureTempReferenceInstantiated(); - - reference.readFields(firstSource); - tempReference.readFields(secondSource); - - int comp = reference.compareTo(tempReference); - return ascendingComparison ? comp : -comp; - } - - @Override - public boolean supportsNormalizedKey() { - return NormalizableKey.class.isAssignableFrom(type); - } - - @Override - public int getNormalizeKeyLen() { - ensureReferenceInstantiated(); - - NormalizableKey<?> key = (NormalizableKey<?>) reference; - return key.getMaxNormalizedKeyLen(); - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return keyBytes < getNormalizeKeyLen(); - } - - @Override - public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { - NormalizableKey<?> key = (NormalizableKey<?>) record; - key.copyNormalizedKey(target, offset, numBytes); - } - - @Override - public boolean invertNormalizedKey() { - return !ascendingComparison; - } - - @Override - public TypeComparator<T> duplicate() { - return new WritableComparator<T>(ascendingComparison, type); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - target[index] = record; - return 1; - } - - @SuppressWarnings("rawtypes") - @Override - public TypeComparator[] getFlatComparators() { - return comparators; - } - - // -------------------------------------------------------------------------------------------- - // unsupported normalization - // -------------------------------------------------------------------------------------------- - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - // -------------------------------------------------------------------------------------------- - - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); - - this.kryo.setAsmEnabled(true); - this.kryo.register(type); - } - } - - private void ensureReferenceInstantiated() { - if (reference == null) { - reference = InstantiationUtil.instantiate(type, Writable.class); - } - } - - private void ensureTempReferenceInstantiated() { - if (tempReference == null) { - tempReference = InstantiationUtil.instantiate(type, Writable.class); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java deleted file mode 100644 index 258d92c..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; - -import com.esotericsoftware.kryo.Kryo; -import org.objenesis.strategy.StdInstantiatorStrategy; - -import java.io.IOException; - -public class WritableSerializer<T extends Writable> extends TypeSerializer<T> { - - private static final long serialVersionUID = 1L; - - private final Class<T> typeClass; - - private transient Kryo kryo; - - private transient T copyInstance; - - public WritableSerializer(Class<T> typeClass) { - this.typeClass = typeClass; - } - - @SuppressWarnings("unchecked") - @Override - public T createInstance() { - if(typeClass == NullWritable.class) { - return (T) NullWritable.get(); - } - return InstantiationUtil.instantiate(typeClass); - } - - - - @Override - public T copy(T from) { - checkKryoInitialized(); - - return KryoUtils.copy(from, kryo, this); - } - - @Override - public T copy(T from, T reuse) { - checkKryoInitialized(); - - return KryoUtils.copy(from, reuse, kryo, this); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(T record, DataOutputView target) throws IOException { - record.write(target); - } - - @Override - public T deserialize(DataInputView source) throws IOException { - return deserialize(createInstance(), source); - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - reuse.readFields(source); - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - ensureInstanceInstantiated(); - copyInstance.readFields(source); - copyInstance.write(target); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public WritableSerializer<T> duplicate() { - return new WritableSerializer<T>(typeClass); - } - - // -------------------------------------------------------------------------------------------- - - private void ensureInstanceInstantiated() { - if (copyInstance == null) { - copyInstance = createInstance(); - } - } - - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); - - this.kryo.setAsmEnabled(true); - this.kryo.register(typeClass); - } - } - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return this.typeClass.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof WritableSerializer) { - WritableSerializer<?> other = (WritableSerializer<?>) obj; - - return other.canEqual(this) && typeClass == other.typeClass; - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof WritableSerializer; - } -}